diff --git a/src/dodal/plans/wrapped.py b/src/dodal/plans/wrapped.py index 1ea01906db8..ef17aaad961 100644 --- a/src/dodal/plans/wrapped.py +++ b/src/dodal/plans/wrapped.py @@ -1,6 +1,6 @@ from collections.abc import Sequence from decimal import Decimal -from typing import Annotated, Any +from typing import Annotated, Any, TypeVar import bluesky.plans as bp import numpy as np @@ -11,20 +11,23 @@ from dodal.common import MsgGenerator from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator -"""This module wraps plan(s) from bluesky.plans until required handling for them is -moved into bluesky or better handled in downstream services. - -Required decorators are installed on plan import +"""This module wraps plan(s) from bluesky.plans so they are compatible with blueapi. +Required decorators are installed on plan import. https://github.com/DiamondLightSource/blueapi/issues/474 -Non-serialisable fields are ignored when they are optional +Non-serialisable fields are ignored when they are optional. https://github.com/DiamondLightSource/blueapi/issues/711 +Using *args in plans is currently not supported. +https://github.com/DiamondLightSource/blueapi/issues/1450 + We may also need other adjustments for UI purposes, e.g. - Forcing uniqueness or orderedness of Readables. - Limits and metadata (e.g. units). """ +T = TypeVar("T") + @attach_data_session_metadata_decorator() @validate_call(config={"arbitrary_types_allowed": True}) @@ -62,33 +65,6 @@ def count( yield from bp.count(tuple(detectors), num, delay=delay, md=metadata) -def _make_num_scan_args( - params: list[tuple[Movable, list[float | int]]], num: int | None = None -): - shape = [] - if num: - shape = [num] - for param in params: - if len(param[1]) == 2: - pass - else: - raise ValueError("You must provide 'start stop' for each motor.") - else: - for param in params: - if len(param[1]) == 3: - shape.append(param[1][-1]) - else: - raise ValueError( - "You must provide 'start stop num' for each motor in a grid scan." - ) - - args = [] - for param in params: - args.append(param[0]) - args.extend(param[1]) - return args, shape - - @validate_call(config={"arbitrary_types_allowed": True}) def num_scan( detectors: Annotated[ @@ -98,11 +74,11 @@ def num_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( description="List of tuples (device, parameter). For concurrent " - "trajectories, provide '[(movable1, [start1, stop1]), (movable2, [start2, " - "stop2]), ... , (movableN, [startN, stopN])]'." + "trajectories, provide '[movable1, start1, stop1, movable2, start2, stop2, " + "... , movableN, startN, stopN]'." ), ], num: int, @@ -113,12 +89,10 @@ def num_scan( The scan is defined by number of points along scan trajector(y/ies). Wraps bluesky.plans.scan(det, *args, num, md=metadata). """ - # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 - args, shape = _make_num_scan_args(params, num) metadata = metadata or {} - metadata["shape"] = shape + metadata["shape"] = (num,) - yield from bp.scan(tuple(detectors), *args, num=num, md=metadata) + yield from bp.scan(tuple(detectors), *params, num=num, md=metadata) @validate_call(config={"arbitrary_types_allowed": True}) @@ -130,7 +104,7 @@ def num_grid_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( description="List of tuples (device, parameter). For independent \ trajectories, provide '[(movable1, [start1, stop1, num1]), (movable2, \ @@ -146,12 +120,9 @@ def num_grid_scan( axes by default (all axes but the first axis provided). Wraps bluesky.plans.grid_scan(det, *args, snake_axes, md=metadata). """ - # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 - args, shape = _make_num_scan_args(params) - metadata = metadata or {} - metadata["shape"] = shape - - yield from bp.grid_scan(tuple(detectors), *args, snake_axes=snake_axes, md=metadata) + yield from bp.grid_scan( + tuple(detectors), *params, snake_axes=snake_axes, md=metadata + ) @validate_call(config={"arbitrary_types_allowed": True}) @@ -163,11 +134,11 @@ def num_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( description="List of tuples (device, parameter). For concurrent \ - trajectories, provide '[(movable1, [start1, stop1]), (movable2, [start2, \ - stop2]), ... , (movableN, [startN, stopN])]'." + trajectories, provide '[movable1, start1, stop1, movable2, start2, stop2, \ + ... , movableN, startN, stopN]'." ), ], num: int | None = None, @@ -178,12 +149,10 @@ def num_rscan( The scan is defined by number of points along scan trajector(y/ies). Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata). """ - # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 - args, shape = _make_num_scan_args(params, num) metadata = metadata or {} - metadata["shape"] = shape + metadata["shape"] = (num,) - yield from bp.rel_scan(tuple(detectors), *args, num=num, md=metadata) + yield from bp.rel_scan(tuple(detectors), *params, num=num, md=metadata) @validate_call(config={"arbitrary_types_allowed": True}) @@ -195,7 +164,7 @@ def num_grid_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( description="List of tuples (device, parameter). For independent \ trajectories, provide '[(movable1, [start1, stop1, num1]), (movable2, \ @@ -211,30 +180,25 @@ def num_grid_rscan( axes by default (all axes but the first axis provided). Wraps bluesky.plans.rel_grid_scan(det, *args, snake_axes, md=metadata). """ - # TODO: move to using Range spec and spec_scan when stable and tested at v1.0 - args, shape = _make_num_scan_args(params) - metadata = metadata or {} - metadata["shape"] = shape - yield from bp.rel_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata + tuple(detectors), *params, snake_axes=snake_axes, md=metadata ) -def _make_list_scan_args(params: list[tuple[Movable, list[float | int]]], grid: bool): +def _make_list_scan_shape( + params: Sequence[Movable | list[float | int]], grid: bool +) -> tuple[int, ...]: shape = [] - args = [] for param in params: - shape.append(len(param[1])) - args.append(param[0]) - args.append(param[1]) - - if not grid: - shape = list(set(shape)) - if len(shape) > 1: - raise ValueError("Lists of motor positions are not equal in length.") + # List arg must all be same size. If list missing or not same size, this will + # be validated by bp.list_scan. + if isinstance(param, list): + dim = len(param) + shape.append(dim) + if not grid: + break - return args, shape + return tuple(shape) @validate_call(config={"arbitrary_types_allowed": True}) @@ -246,7 +210,7 @@ def list_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + list[Movable | list[float | int]], Field( description="List of tuples (device, positions). For concurrent \ trajectories, provide '[(movable1, [point1, point2, ...]), (movable2, \ @@ -261,11 +225,10 @@ def list_scan( The scan is defined by providing a list of points for each scan trajectory. Wraps bluesky.plans.list_scan(det, *args, md=metadata). """ - args, shape = _make_list_scan_args(params=params, grid=False) metadata = metadata or {} - metadata["shape"] = shape + metadata["shape"] = _make_list_scan_shape(params, grid=False) - yield from bp.list_scan(tuple(detectors), *args, md=metadata) + yield from bp.list_scan(tuple(detectors), *tuple(params), md=metadata) # type: ignore @validate_call(config={"arbitrary_types_allowed": True}) @@ -277,11 +240,11 @@ def list_grid_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | list[float | int]], Field( - description="List of tuples (device, positions). For independent \ - trajectories, provide '[(movable1, [point1, point2, ...]), (movable2, \ - [point1, point2, ...]), ... , (movableN, [point1, point2, ...])]'." + description="For independent trajectories, provide" + "'[movable1, [point1, point2, ...], movable2, [point1, point2, ...], ..., " + "movableN, [point1, point2, ...]]'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -293,12 +256,11 @@ def list_grid_scan( all fast axes by default (all axes but the first axis provided). Wraps bluesky.plans.list_grid_scan(det, *args, md=metadata). """ - args, shape = _make_list_scan_args(params=params, grid=True) metadata = metadata or {} - metadata["shape"] = shape + metadata["shape"] = _make_list_scan_shape(params, grid=True) yield from bp.list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata + tuple(detectors), *params, snake_axes=snake_axes, md=metadata ) @@ -311,11 +273,11 @@ def list_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | list[float | int]], Field( - description="List of tuples (device, positions). For concurrent \ - trajectories, provide '[(movable1, [point1, point2, ...]), (movable2, \ - [point1, point2, ...]), ... , (movableN, [point1, point2, ...])]'. Number \ + description="For concurrent trajectories, provide " + "'[movable1, [point1, point2, ...], movable2, [point1, point2, ...], ..., " + "movableN, [point1, point2, ...]]'. Number \ of points for each movable must be equal." ), ], @@ -326,11 +288,9 @@ def list_rscan( The scan is defined by providing a list of points for each scan trajectory. Wraps bluesky.plans.rel_list_scan(det, *args, md=metadata). """ - args, shape = _make_list_scan_args(params=params, grid=False) metadata = metadata or {} - metadata["shape"] = shape - - yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata) + metadata["shape"] = _make_list_scan_shape(params, grid=False) + yield from bp.rel_list_scan(tuple(detectors), *params, md=metadata) @validate_call(config={"arbitrary_types_allowed": True}) @@ -342,11 +302,11 @@ def list_grid_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | list[float | int]], Field( - description="List of tuples (device, positions). For independent \ - trajectories, provide '[(movable1, [point1, point2, ...]), (movable2, \ - [point1, point2, ...]), ... , (movableN, [point1, point2, ...])]'." + description="For independent trajectories, provide " + "'[movable1, [point1, point2, ...], movable2, [point1, point2, ...], ... , " + "movableN, [point1, point2, ...]]'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -358,16 +318,16 @@ def list_grid_rscan( all fast axes by default (all axes but the first axis provided). Wraps bluesky.plans.rel_list_grid_scan(det, *args, md=metadata). """ - args, shape = _make_list_scan_args(params=params, grid=True) metadata = metadata or {} - metadata["shape"] = shape - + metadata["shape"] = _make_list_scan_shape(params, grid=True) yield from bp.rel_list_grid_scan( - tuple(detectors), *args, snake_axes=snake_axes, md=metadata + tuple(detectors), *params, snake_axes=snake_axes, md=metadata ) -def _round_list_elements(stepped_list, params) -> list[float]: +def _round_list_elements( + stepped_list: list[float | int], params: list[float | int] +) -> list[float | int]: decimals = [Decimal(str(param)) for param in params] exponents = [d.as_tuple().exponent for d in decimals] decimal_places = [-exponent for exponent in exponents] # type: ignore @@ -375,11 +335,15 @@ def _round_list_elements(stepped_list, params) -> list[float]: return np.round(stepped_list, decimals=max_decimal_places).tolist() -def _make_stepped_list_step(start: float, stop: float, step: float) -> list: +def _make_stepped_list_step( + start: float, stop: float, step: float +) -> list[float | int]: if start == stop: raise ValueError( f"Start ({start}) and stop ({stop}) values cannot be the same." ) + if step == 0: + raise ValueError(f"Step size {step} cannot be zero.") if abs(step) > abs(stop - start): step = stop - start step = abs(step) * np.sign(stop - start) @@ -392,7 +356,11 @@ def _make_stepped_list_step(start: float, stop: float, step: float) -> list: return rounded_stepped_list -def _make_stepped_list_num(start, step, num) -> list: +def _make_stepped_list_num(start: float, step: float, num: int) -> list[float | int]: + if num == 0 or step == 0: + raise ValueError( + f"Number of points ({num}) and number of steps ({step}) cannot be zero." + ) stepped_list = [start + (n * step) for n in range(num)] rounded_stepped_list = _round_list_elements( stepped_list=stepped_list, params=[start, step] @@ -400,49 +368,82 @@ def _make_stepped_list_num(start, step, num) -> list: return rounded_stepped_list -def _make_step_scan_args( - params: list[tuple[Movable, list[float | int]]], grid: bool -) -> tuple[list[Any], list[float]]: - args = [] - shape = [] - stepped_list_length = None - - first_movable_param, *additional_movable_params = params - if len(first_movable_param[1]) == 3: - start, stop, step = first_movable_param[1] - stepped_list = _make_stepped_list_step(start, stop, step) - stepped_list_length = len(stepped_list) - args.append(first_movable_param[0]) - args.append(stepped_list) - shape.append(stepped_list_length) - else: +def require( + value: object, + expected: type[T] | tuple[type[T], ...], + name: str, +) -> T: + expected_tuple = expected if isinstance(expected, tuple) else (expected,) + if not isinstance(value, expected_tuple): + allowed = ", ".join(t.__name__ for t in expected_tuple) + raise ValueError( + f"Parameter {name} must be one of type {allowed}, got {type(value).__name__}." + ) + return value # type: ignore[return-value] + + +def parse_full_axis( + values: Sequence[Movable | float | int], +) -> tuple[Movable, float, float, float]: + if len(values) != 4: raise ValueError( - f"You provided {len(first_movable_param[1])} parameters for {first_movable_param[0]}, rather than 3." + f"The axis must be movable, start, stop, step. You provided {values}" ) - for param in additional_movable_params: - if grid: - if len(param[1]) == 3: - start, stop, step = param[1] - stepped_list = _make_stepped_list_step(start, stop, step) - args.append(param[0]) - args.append(stepped_list) - shape.append(len(stepped_list)) - else: - raise ValueError( - f"You provided {len(param[1])} parameters for {param[0]}, rather than 3." - ) + movable = require(values[0], Movable, "movable") + start = require(values[1], (int, float), "start") + stop = require(values[2], (int, float), "stop") + step = require(values[3], (int, float), "step") + return movable, start, stop, step + + +def parse_relative_axis( + values: Sequence[Movable | float | int], +) -> tuple[Movable, float, float]: + if len(values) != 3: + raise ValueError( + f"The axis must be movable, start, stop. You provided {', '.join(map(str, values))}" + ) + movable = require(values[0], Movable, "movable") + start = require(values[1], (int, float), "start") + step = require(values[2], (int, float), "step") + return movable, start, step + + +def _make_step_scan_args_and_shape( + params: Sequence[Movable | float | int], grid: bool +) -> tuple[list[Movable | list[float]], tuple[int, ...]]: + """Convert [x, 1, 4, 1, ...] to [x, [1, 2, 3, 4], ...].""" + list_of_movable_with_values: list[list[Movable | float | int]] = [] + current_list: list[Movable | float | int] = [] + for param in params: + if isinstance(param, Movable): + current_list = [param] + list_of_movable_with_values.append(current_list) + elif isinstance(param, (int, float)): + current_list.append(param) else: - if len(param[1]) == 2: - start, step = param[1] - stepped_list = _make_stepped_list_num(start, step, stepped_list_length) - args.append(param[0]) - args.append(stepped_list) - else: - raise ValueError( - f"You provided {len(param[1])} parameters {param[0]}, rather than 2." - ) + raise ValueError( + "Scan syntax only takes movables or numbers as parameters. " + f'You provided "{param}".' + ) - return args, shape + step_scan_args: list[Movable | list[float]] = [] + shape = [] + first_axis = True + for movable_with_values in list_of_movable_with_values: + if first_axis or grid: + movable, start, stop, step = parse_full_axis(movable_with_values) + movable_values = _make_stepped_list_step(start, stop, step) + shape.append(len(movable_values)) + first_axis = False + else: + # If not a grid scan, expects start, stop for all other axes and use the + # first axis shape for the number of steps. + movable, start, step = parse_relative_axis(movable_with_values) + movable_values = _make_stepped_list_num(start, step, shape[0]) + step_scan_args.extend([movable, movable_values]) + + return step_scan_args, tuple(shape) @validate_call(config={"arbitrary_types_allowed": True}) @@ -454,11 +455,11 @@ def step_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( - description="List of tuples (device, parameter). For concurrent \ - trajectories, provide '[(movable1, [start1, stop1, step1]), (movable2, \ - [start2, step2]), ... , (movableN, [startN, stepN])]'." + description="For concurrent trajectories, provide " + "'[movable1, start1, stop1, step1, movable2, start2, step2, ... , " + "movableN, startN, stepN]'." ), ], metadata: dict[str, Any] | None = None, @@ -469,11 +470,11 @@ def step_scan( bluesky.plans.list_scan(det, *args, md=metadata). """ # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 - args, shape = _make_step_scan_args(params, grid=False) + args, shape = _make_step_scan_args_and_shape(params, grid=False) metadata = metadata or {} metadata["shape"] = shape - yield from bp.list_scan(tuple(detectors), *args, md=metadata) + yield from bp.list_scan(tuple(detectors), *tuple(args), md=metadata) # type: ignore @validate_call(config={"arbitrary_types_allowed": True}) @@ -485,11 +486,11 @@ def step_grid_scan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( description="List of tuples (device, parameter). For independent \ - trajectories, provide '[(movable1, [start1, stop1, step1]), (movable2, \ - [start2, stop2, step2]), ... , (movableN, [startN, stopN, stepN])]'." + trajectories, provide '[movable1, start1, stop1, step1, movable2, start2, " + "stop2, step2, ... , movableN, startN, stopN, stepN]'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -502,7 +503,7 @@ def step_grid_scan( default (all axes but the first axis provided). """ # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 - args, shape = _make_step_scan_args(params, grid=True) + args, shape = _make_step_scan_args_and_shape(params, grid=True) metadata = metadata or {} metadata["shape"] = shape @@ -520,11 +521,11 @@ def step_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( - description="List of tuples (device, parameter). For concurrent \ - trajectories, provide '[(movable1, [start1, stop1, step1]), (movable2, \ - [start2, step2]), ... , (movableN, [startN, stepN])]'." + description="For concurrent trajectories, provide " + "'[movable1, start1, stop1, step1, movable2, start2, step2, ... , " + "movableN, startN, stepN]'." ), ], metadata: dict[str, Any] | None = None, @@ -535,7 +536,7 @@ def step_rscan( bluesky.plans.rel_list_scan(det, *args, md=metadata). """ # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 - args, shape = _make_step_scan_args(params, grid=False) + args, shape = _make_step_scan_args_and_shape(params, grid=False) metadata = metadata or {} metadata["shape"] = shape @@ -551,11 +552,11 @@ def step_grid_rscan( ), ], params: Annotated[ - list[tuple[Movable, list[float | int]]], + Sequence[Movable | float | int], Field( - description="List of tuples (device, parameter). For independent \ - trajectories, provide '[(movable1, [start1, stop1, step1]), (movable2, \ - [start2, stop2, step2]), ... , (movableN, [startN, stopN, stepN])]'." + description="For independent trajectories, provide " + "'[movable1, start1, stop1, step1, movable2, start2, stop2, step2, ... , " + "movableN, startN, stopN, stepN]'." ), ], snake_axes: bool = True, # Currently specifying axes to snake is not supported @@ -568,7 +569,7 @@ def step_grid_rscan( default (all axes but the first axis provided). """ # TODO: move to using Linspace spec and spec_scan when stable and tested at v1.0 - args, shape = _make_step_scan_args(params, grid=True) + args, shape = _make_step_scan_args_and_shape(params, grid=True) metadata = metadata or {} metadata["shape"] = shape diff --git a/tests/plans/test_wrapped.py b/tests/plans/test_wrapped.py index 43bf8b21ec3..c27c208a4ba 100644 --- a/tests/plans/test_wrapped.py +++ b/tests/plans/test_wrapped.py @@ -1,3 +1,4 @@ +import re from collections.abc import Mapping, Sequence from typing import cast @@ -20,9 +21,7 @@ from dodal.devices.motors import Motor from dodal.plans.wrapped import ( - _make_list_scan_args, - _make_num_scan_args, - _make_step_scan_args, + _make_step_scan_args_and_shape, _make_stepped_list_num, _make_stepped_list_step, _round_list_elements, @@ -35,6 +34,7 @@ num_grid_scan, num_rscan, num_scan, + require, step_grid_rscan, step_grid_scan, step_rscan, @@ -42,6 +42,13 @@ ) +def assert_expected_shape( + run_engine_documents: Mapping[str, list[dict]], expected_shape: tuple[int, ...] +) -> None: + start = run_engine_documents["start"][0] + assert start["shape"] == expected_shape + + def test_count_delay_validation(det: StandardDetector, run_engine: RunEngine): args: dict[float | Sequence[float], str] = { # type: ignore # List wrong length @@ -61,7 +68,6 @@ def test_count_delay_validation(det: StandardDetector, run_engine: RunEngine): for delay, reason in args.items(): with pytest.raises((ValidationError, AssertionError), match=reason): run_engine(count([det], num=3, delay=delay)) - print(delay) def test_count_detectors_validation(run_engine: RunEngine): @@ -99,10 +105,10 @@ def test_count_plan_produces_expected_start_document( start = run_engine_documents.get("start") assert start and len(start) == 1 run_start = cast(RunStart, start[0]) - assert run_start.get("shape") == shape assert (hints := run_start.get("hints")) and ( hints.get("dimensions") == [(("time",), "primary")] ) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize("num, length", ([1, 1], [3, 3])) @@ -187,28 +193,6 @@ def test_count_with_no_detector_raise_error(run_engine: RunEngine): run_engine(count([])) -@pytest.mark.parametrize( - "x_list, y_list, num, final_shape, final_length", - ( - [[0.0, 1.1], [2.2, 3.3], 3, [3], 6], - [[0.0, 1.1, 2], [2.2, 3.3, 3], None, [2, 3], 8], - ), -) -def test_make_num_scan_args( - x_axis: Motor, - y_axis: Motor, - x_list: list[float | int], - y_list: list[float | int], - num: int | None, - final_shape: list[int], - final_length: int, -): - args, shape = _make_num_scan_args([(x_axis, x_list), (y_axis, y_list)], num=num) - assert shape == final_shape - assert len(args) == final_length - assert args[0] == x_axis - - def _assert_emitted( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], @@ -254,8 +238,9 @@ def test_num_scan_with_one_axis( x_list: list[float | int], num: int, ): - run_engine(num_scan(detectors=detectors, params=[(x_axis, x_list)], num=num)) + run_engine(num_scan(detectors=detectors, params=[x_axis, *x_list], num=num)) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -274,18 +259,19 @@ def test_num_scan_with_two_axes( run_engine( num_scan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + params=[x_axis, *x_list, y_axis, *y_list], num=num, ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) def test_num_scan_fails_when_given_wrong_number_of_params( - run_engine: RunEngine, detectors: Sequence[StandardDetector], x_axis: Motor + run_engine: RunEngine, x_axis: Motor ): with pytest.raises(ValueError): - run_engine(num_scan(detectors=detectors, params=[(x_axis, [-1, 1, 5])], num=5)) + run_engine(num_scan(detectors=[], params=[x_axis, -1, 1, 5], num=5)) @pytest.mark.parametrize( @@ -294,7 +280,6 @@ def test_num_scan_fails_when_given_wrong_number_of_params( ) def test_num_scan_fails_when_given_bad_info( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, x_list: list[float | int], y_axis: Motor, @@ -304,70 +289,67 @@ def test_num_scan_fails_when_given_bad_info( with pytest.raises(ValueError): run_engine( num_scan( - detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + detectors=[], + params=[x_axis, *x_list, y_axis, *y_list], num=num, ) ) @pytest.mark.parametrize( - "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([(-1.1, 1.1, 5), (2.2, -2.2, 3)], [(0, 1.1, 5), (2.2, 3.3, 5)]) ) def test_num_grid_scan( run_engine: RunEngine, run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list[float | int], + x_list: tuple[float, float, int], y_axis: Motor, - y_list: list[float | int], + y_list: tuple[float, float, int], ): num = int(x_list[-1] * y_list[-1]) run_engine( num_grid_scan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + params=[x_axis, *x_list, y_axis, *y_list], ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (x_list[2], y_list[2])) @pytest.mark.parametrize( - "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([(-1.1, 1.1, 5), (2.2, -2.2, 3)], [(0, 1.1, 5), (2.2, 3.3, 5)]) ) def test_num_grid_scan_when_not_snaking( run_engine: RunEngine, run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list[float | int], + x_list: tuple[float, float, int], y_axis: Motor, - y_list: list[float | int], + y_list: tuple[float, float, int], ): num = int(x_list[-1] * y_list[-1]) run_engine( num_grid_scan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + params=[x_axis, *x_list, y_axis, *y_list], snake_axes=False, ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (x_list[2], y_list[2])) def test_num_grid_scan_fails_when_given_wrong_number_of_params( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, y_axis: Motor, ): with pytest.raises(ValueError): - run_engine( - num_grid_scan( - detectors=detectors, params=[(x_axis, [0, 1.1, 2]), (y_axis, [1.1])] - ) - ) + run_engine(num_grid_scan(detectors=[], params=[x_axis, 0, 1.1, 2, y_axis, 1.1])) @pytest.mark.parametrize( @@ -375,7 +357,6 @@ def test_num_grid_scan_fails_when_given_wrong_number_of_params( ) def test_num_scan_fails_when_asked_to_snake_slow_axis( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, x_list: list[float | int], y_axis: Motor, @@ -384,8 +365,8 @@ def test_num_scan_fails_when_asked_to_snake_slow_axis( with pytest.raises(ValueError): run_engine( num_grid_scan( - detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + detectors=[], + params=[x_axis, *x_list, y_axis, *y_list], snake_axes=[x_axis], ) ) @@ -400,8 +381,9 @@ def test_num_rscan( x_list: list[float | int], num: int, ): - run_engine(num_rscan(detectors=detectors, params=[(x_axis, x_list)], num=num)) + run_engine(num_rscan(detectors=detectors, params=[x_axis, *x_list], num=num)) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -419,10 +401,11 @@ def test_num_rscan_with_two_axes( ): run_engine( num_rscan( - detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)], num=num + detectors=detectors, params=[x_axis, *x_list, y_axis, *y_list], num=num ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -430,7 +413,6 @@ def test_num_rscan_with_two_axes( ) def test_num_rscan_fails_when_given_bad_info( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, x_list: list[float | int], y_axis: Motor, @@ -440,56 +422,58 @@ def test_num_rscan_fails_when_given_bad_info( with pytest.raises(ValueError): run_engine( num_rscan( - detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + detectors=[], + params=[x_axis, *x_list, y_axis, *y_list], num=num, ) ) @pytest.mark.parametrize( - "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([(-1.1, 1.1, 5), (2.2, -2.2, 3)], [(0, 1.1, 5), (2.2, 3.3, 5)]) ) def test_num_grid_rscan( run_engine: RunEngine, run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list[float | int], + x_list: tuple[float, float, int], y_axis: Motor, - y_list: list[float | int], + y_list: tuple[float, float, int], ): num = int(x_list[-1] * y_list[-1]) run_engine( num_grid_rscan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + params=[x_axis, *x_list, y_axis, *y_list], ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (x_list[2], y_list[2])) @pytest.mark.parametrize( - "x_list, y_list", ([[-1.1, 1.1, 5], [2.2, -2.2, 3]], [[0, 1.1, 5], [2.2, 3.3, 5]]) + "x_list, y_list", ([(-1.1, 1.1, 5), (2.2, -2.2, 3)], [(0, 1.1, 5), (2.2, 3.3, 5)]) ) def test_num_grid_rscan_when_not_snaking( run_engine: RunEngine, run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list[float | int], + x_list: tuple[float, float, int], y_axis: Motor, - y_list: list[float | int], + y_list: tuple[float, float, int], ): num = int(x_list[-1] * y_list[-1]) run_engine( num_grid_rscan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + params=[x_axis, *x_list, y_axis, *y_list], snake_axes=False, ) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (x_list[2], y_list[2])) @pytest.mark.parametrize( @@ -497,7 +481,6 @@ def test_num_grid_rscan_when_not_snaking( ) def test_num_grid_rscan_fails_when_asked_to_snake_slow_axis( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, x_list: list[float | int], y_axis: Motor, @@ -506,8 +489,8 @@ def test_num_grid_rscan_fails_when_asked_to_snake_slow_axis( with pytest.raises(ValueError): run_engine( num_grid_rscan( - detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], + detectors=[], + params=[x_axis, *x_list, y_axis, *y_list], snake_axes=[x_axis], ) ) @@ -515,9 +498,12 @@ def test_num_grid_rscan_fails_when_asked_to_snake_slow_axis( @pytest.mark.parametrize( "x_list, y_list, grid, final_shape, final_length", - ([[0, 1, 2], [3, 4, 5], False, [3], 4], [[0, 1, 2], [3, 4, 5, 6], True, [3, 4], 4]), + ( + [[0, 10, 1], [0, 5], False, (11,), 4], + [[0, 10, 1], [0, 5, 1], True, (11, 6), 4], + ), ) -def test_make_list_scan_args( +def test_make_step_scan_args_and_shape( x_axis: Motor, x_list: list, y_axis: Motor, @@ -526,8 +512,8 @@ def test_make_list_scan_args( final_shape: list, final_length: int, ): - args, shape = _make_list_scan_args( - params=[(x_axis, x_list), (y_axis, y_list)], grid=grid + args, shape = _make_step_scan_args_and_shape( + params=[x_axis, *x_list, y_axis, *y_list], grid=grid ) assert len(args) == final_length assert shape == final_shape @@ -538,8 +524,8 @@ def test_make_list_scan_args_fails_when_lists_are_different_lengths( y_axis: Motor, ): with pytest.raises(ValueError): - _make_list_scan_args( - params=[(x_axis, [0, 1, 2]), (y_axis, [0, 1, 2, 3])], grid=False + _make_step_scan_args_and_shape( + params=[x_axis, 0, 1, 2, y_axis, 0, 1, 2, 3], grid=False ) @@ -551,10 +537,10 @@ def test_list_scan( x_axis: Motor, x_list: list, ): - num = int(len(x_list)) - - run_engine(list_scan(detectors=detectors, params=[(x_axis, x_list)])) + num = len(x_list) + run_engine(list_scan(detectors=detectors, params=[x_axis, x_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -574,23 +560,21 @@ def test_list_scan_with_two_axes( y_list: list, ): num = int(len(x_list)) - run_engine( - list_scan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) - ) + run_engine(list_scan(detectors=detectors, params=[x_axis, x_list, y_axis, y_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) def test_list_scan_fails_with_differnt_list_lengths( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, y_axis: Motor, ): with pytest.raises(ValueError): run_engine( list_scan( - detectors=detectors, - params=[(x_axis, [1, 2, 3, 4, 5]), (y_axis, [1, 2, 3, 4])], + detectors=[], + params=[x_axis, [1, 2, 3, 4, 5], y_axis, [1, 2, 3, 4]], ) ) @@ -607,15 +591,16 @@ def test_list_grid_scan( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, + y_list: list[float | int], ): num = int(len(x_list) * len(y_list)) run_engine( - list_grid_scan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) + list_grid_scan(detectors=detectors, params=[x_axis, x_list, y_axis, y_list]) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (len(x_list), len(y_list))) @pytest.mark.parametrize("x_list", ([0, 1, 2, 3], [1.1, 2.2, 3.3])) @@ -627,8 +612,9 @@ def test_list_rscan( x_list: list, ): num = int(len(x_list)) - run_engine(list_rscan(detectors=detectors, params=[(x_axis, x_list)])) + run_engine(list_rscan(detectors=detectors, params=[x_axis, x_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (len(x_list),)) @pytest.mark.parametrize( @@ -649,23 +635,21 @@ def test_list_rscan_with_two_axes( ): num = int(len(x_list)) - run_engine( - list_rscan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) - ) + run_engine(list_rscan(detectors=detectors, params=[x_axis, x_list, y_axis, y_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) def test_list_rscan_fails_with_differnt_list_lengths( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, y_axis: Motor, ): with pytest.raises(ValueError): run_engine( list_rscan( - detectors=detectors, - params=[(x_axis, [1, 2, 3, 4, 5]), (y_axis, [1, 2, 3, 4])], + detectors=[], + params=[x_axis, [1, 2, 3, 4, 5], y_axis, [1, 2, 3, 4]], ) ) @@ -689,11 +673,10 @@ def test_list_grid_rscan( num = int(len(x_list) * len(y_list)) run_engine( - list_grid_rscan( - detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)] - ) + list_grid_rscan(detectors=detectors, params=[x_axis, x_list, y_axis, y_list]) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (len(x_list), len(y_list))) @pytest.mark.parametrize( @@ -748,36 +731,30 @@ def test_make_stepped_list_num(start: float, step: float): assert stepped_list[10] == 0 -def test_make_stepped_list_fails_when_given_equal_start_and_stop_values(): - with pytest.raises(ValueError): - _make_stepped_list_step(start=1.1, stop=1.1, step=0.25) +def test_make_stepped_list_num_fails_when_num_is_zero(): + start = stop = 1.1 + with pytest.raises( + ValueError, + match=re.escape( + f"Start ({start}) and stop ({stop}) values cannot be the same." + ), + ): + _make_stepped_list_step(start=start, stop=stop, step=0.25) -@pytest.mark.parametrize( - "x_list, y_list, grid, final_shape, final_length", - ( - [[0, 1, 0.25], [0, 0.1], False, [5], 4], - [[0, 1, 0.25], [0, 1, 0.2], True, [5, 6], 4], - [[0, -1, -0.25], [0, -0.1], False, [5], 4], - [[0, -1, -0.25], [0, -1, -0.2], True, [5, 6], 4], - ), -) -def test_make_step_scan_args( - x_axis: Motor, - x_list: list[float], - y_axis: Motor, - y_list: list[float], - grid: bool, - final_shape: list, - final_length: int, -): - args, shape = _make_step_scan_args( - params=[(x_axis, x_list), (y_axis, y_list)], grid=grid - ) - assert shape == final_shape - assert len(args) == final_length - assert args[0] == x_axis - assert args[2] == y_axis +def test_make_stepped_list_num_fails_when_given_equal_start_and_stop_values(): + with pytest.raises( + ValueError, + match=re.escape("Number of points (0) and number of steps (0) cannot be zero."), + ): + _make_stepped_list_num(start=1, step=0, num=0) + + +def test_require_raises_error_if_not_correct_type(): + with pytest.raises( + ValueError, match="Parameter test must be one of type str, got int." + ): + require(value=5, expected=str, name="test") @pytest.mark.parametrize( @@ -791,16 +768,16 @@ def test_make_step_scan_args( ) def test_make_step_scan_args_fails_when_given_incorrect_number_of_parameters( x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, + y_list: list[float | int], z_axis: Motor, - z_list: list, + z_list: list[float | int], grid: bool, ): with pytest.raises(ValueError): - _make_step_scan_args( - params=[(x_axis, x_list), (y_axis, y_list), (z_axis, z_list)], grid=grid + _make_step_scan_args_and_shape( + params=[x_axis, *x_list, y_axis, *y_list, z_axis, *z_list], grid=grid ) @@ -812,11 +789,12 @@ def test_step_scan( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], num, ): - run_engine(step_scan(detectors=detectors, params=[(x_axis, x_list)])) + run_engine(step_scan(detectors=detectors, params=[x_axis, *x_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -832,23 +810,27 @@ def test_step_scan_with_multiple_axes( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, + y_list: list[float | int], num, ): run_engine( - step_scan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) + step_scan(detectors=detectors, params=[x_axis, *x_list, y_axis, *y_list]) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( - "x_list, y_list, num", + "x_list, expected_num_x, y_list, expected_num_y, snake", ( - [[0, 1, 0.25], [0, 2, 0.5], 25], - [[-1, 1, 0.25], [1, -1, -0.5], 45], - [[0, 10, 2.5], [0, -10, -2.5], 25], + [[0, 1, 0.25], 5, [0, 2, 0.5], 5, True], + [[0, 1, 0.25], 5, [0, 2, 0.5], 5, False], + [[-1, 1, 0.25], 9, [1, -1, -0.5], 5, True], + [[-1, 1, 0.25], 9, [1, -1, -0.5], 5, False], + [[0, 10, 2.5], 5, [0, -10, -2.5], 5, True], + [[0, 10, 2.5], 5, [0, -10, -2.5], 5, False], ), ) def test_step_grid_scan( @@ -856,42 +838,22 @@ def test_step_grid_scan( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, - y_axis: Motor, - y_list: list, - num, -): - run_engine( - step_grid_scan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) - ) - _assert_emitted(run_engine_documents, detectors, num) - - -@pytest.mark.parametrize( - "x_list, y_list, num", - ( - [[0, 1, 0.25], [0, 2, 0.5], 25], - [[-1, 1, 0.25], [1, -1, -0.5], 45], - ), -) -def test_step_grid_scan_when_not_snaking( - run_engine: RunEngine, - run_engine_documents: Mapping[str, list[dict]], - detectors: Sequence[StandardDetector], - x_axis: Motor, - x_list: list, + x_list: list[float | int], + expected_num_x: int, y_axis: Motor, - y_list: list, - num, + y_list: list[float | int], + expected_num_y: int, + snake: bool, ): run_engine( step_grid_scan( detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], - snake_axes=False, + params=[x_axis, *x_list, y_axis, *y_list], + snake_axes=snake, ) ) - _assert_emitted(run_engine_documents, detectors, num) + _assert_emitted(run_engine_documents, detectors, expected_num_x * expected_num_y) + assert_expected_shape(run_engine_documents, (expected_num_x, expected_num_y)) @pytest.mark.parametrize( @@ -901,31 +863,33 @@ def test_step_grid_scan_fails_when_given_incorrect_number_of_params( run_engine: RunEngine, detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, + y_list: list[float | int], ): with pytest.raises(ValueError): run_engine( step_grid_scan( - detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)] + detectors=detectors, params=[x_axis, *x_list, y_axis, *y_list] ) ) @pytest.mark.parametrize( - "x_list, num", ([[0, 1, 0.1], 11], [[-1, 1, 0.1], 21], [[0, 10, 1], 11]) + "x_list, num", + ([[0, 1, 0.1], 11], [[-1, 1, 0.1], 21], [[0, 10, 1], 11]), ) def test_step_rscan( run_engine: RunEngine, run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, - num, + x_list: list[float | int], + num: int, ): - run_engine(step_rscan(detectors=detectors, params=[(x_axis, x_list)])) + run_engine(step_rscan(detectors=detectors, params=[x_axis, *x_list])) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( @@ -941,23 +905,27 @@ def test_step_rscan_with_multiple_axes( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, - num, + y_list: list[float | int], + num: int, ): run_engine( - step_rscan(detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)]) + step_rscan(detectors=detectors, params=[x_axis, *x_list, y_axis, *y_list]) ) _assert_emitted(run_engine_documents, detectors, num) + assert_expected_shape(run_engine_documents, (num,)) @pytest.mark.parametrize( - "x_list, y_list, num", + "x_list, expected_num_x, y_list, expected_num_y, snake", ( - [[0, 1, 0.25], [0, 2, 0.5], 25], - [[-1, 1, 0.25], [1, -1, -0.5], 45], - [[0, 10, 2.5], [0, -10, -2.5], 25], + [[0, 1, 0.25], 5, [0, 2, 0.5], 5, True], + [[0, 1, 0.25], 5, [0, 2, 0.5], 5, False], + [[-1, 1, 0.25], 9, [1, -1, -0.5], 5, True], + [[-1, 1, 0.25], 9, [1, -1, -0.5], 5, False], + [[0, 10, 2.5], 5, [0, -10, -2.5], 5, True], + [[0, 10, 2.5], 5, [0, -10, -2.5], 5, False], ), ) def test_step_grid_rscan( @@ -965,60 +933,101 @@ def test_step_grid_rscan( run_engine_documents: Mapping[str, list[dict]], detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], + expected_num_x: int, y_axis: Motor, - y_list: list, - num, + y_list: list[float | int], + expected_num_y: int, + snake: bool, ): run_engine( step_grid_rscan( - detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)] + detectors=detectors, + params=[x_axis, *x_list, y_axis, *y_list], + snake_axes=snake, ) ) - _assert_emitted(run_engine_documents, detectors, num) + _assert_emitted(run_engine_documents, detectors, expected_num_x * expected_num_y) + assert_expected_shape(run_engine_documents, (expected_num_x, expected_num_y)) -@pytest.mark.parametrize( - "x_list, y_list, num", - ( - [[0, 1, 0.25], [0, 2, 0.5], 25], - [[-1, 1, 0.25], [1, -1, -0.5], 45], - ), -) -def test_step_grid_rscan_when_not_snaking( +@pytest.mark.parametrize("x_list, y_list", ([[0, 1], [0, 1, 0.1]], [[0], [0, 1, 0.1]])) +def test_step_grid_scan_fails_when_given_wrong_number_of_args_for_first_axes( run_engine: RunEngine, - run_engine_documents: Mapping[str, list[dict]], - detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, - num, + y_list: list[float | int], ): - run_engine( - step_grid_rscan( - detectors=detectors, - params=[(x_axis, x_list), (y_axis, y_list)], - snake_axes=False, + with pytest.raises( + ValueError, + match="The axis must be movable, start, stop, step.", + ): + run_engine( + step_grid_scan(detectors=[], params=[x_axis, *x_list, y_axis, *y_list]) ) - ) - _assert_emitted(run_engine_documents, detectors, num) @pytest.mark.parametrize( "x_list, y_list", ([[0, 1, 0.1], [0, 1, 0.1, 1]], [[0, 1, 0.1], [0]]) ) -def test_step_grid_rscan_fails_when_given_incorrect_number_of_params( +def test_step_grid_scan_fails_when_given_wrong_number_of_args_for_second_axes( run_engine: RunEngine, - detectors: Sequence[StandardDetector], x_axis: Motor, - x_list: list, + x_list: list[float | int], y_axis: Motor, - y_list: list, + y_list: list[float | int], ): - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="The axis must be movable, start, stop, step.", + ): run_engine( - step_grid_rscan( - detectors=detectors, params=[(x_axis, x_list), (y_axis, y_list)] - ) + step_grid_scan(detectors=[], params=[x_axis, *x_list, y_axis, *y_list]) ) + + +@pytest.mark.parametrize( + "x_list, y_list", ([[0, 1, 0.1], [0, 1, 0.1]], [[0, 1, 0.1], [0]]) +) +def test_step_scan_fails_when_given_wrong_number_of_args_for_second_axes( + run_engine: RunEngine, + x_axis: Motor, + x_list: list[float | int], + y_axis: Motor, + y_list: list[float | int], +): + with pytest.raises( + ValueError, + match="The axis must be movable, start, stop.", + ): + run_engine(step_scan(detectors=[], params=[x_axis, *x_list, y_axis, *y_list])) + + +def test_make_step_scan_args_and_shape_fails_with_invalid_type_args( + x_axis: Motor, + y_axis: Motor, +): + with pytest.raises( + ValueError, + match="Scan syntax only takes movables or numbers as parameters.", + ): + _make_step_scan_args_and_shape( + [x_axis, 1, "3", 1, y_axis, 1, "4", 1], # type: ignore + grid=True, + ) + _make_step_scan_args_and_shape( + [x_axis, 1, "3", 1, y_axis, 1, "4"], # type: ignore + grid=False, + ) + + +def test_step_scan_fails_with_step_size_zero( + run_engine: RunEngine, + x_axis: Motor, +): + with pytest.raises( + ValueError, + match="Step size 0 cannot be zero.", + ): + run_engine(step_scan(detectors=[], params=[x_axis, 1, 5, 0]))