diff --git a/docs/how-to/create-beamline.md b/docs/how-to/create-beamline.md index 202162a973b..0386aaec654 100644 --- a/docs/how-to/create-beamline.md +++ b/docs/how-to/create-beamline.md @@ -15,7 +15,7 @@ beamline are instantiated. The file should be named after the colloquial name fo Beamline modules (in ``dodal.beamlines``) are code-as-configuration. They define the set of devices and common device settings needed for a particular beamline or group of similar beamlines (e.g. a beamline and its digital twin). Some of our tooling depends on the convention of *only* beamline modules going in this package. Common utilities should -go somewhere else e.g. ``dodal.utils`` or ``dodal.beamlines.common``. +go somewhere else e.g. ``dodal.utils`` or ``dodal.common.beamlines``. The following example creates a fictitious beamline ``w41``, with a simulated twin ``s41``. ``w41`` needs to monitor the status of the Synchrotron and has an AdAravisDetector. diff --git a/docs/reference/device-standards.md b/docs/reference/device-standards.md index 5f87d7da903..7ea02ade074 100644 --- a/docs/reference/device-standards.md +++ b/docs/reference/device-standards.md @@ -48,7 +48,7 @@ When a device is named in this way, all of its child devices are named appropria x = Motor(prefix + "X") super().__init__(name) - @device_factory() + @devices.factory() def foo() -> MyDevice: return MyDevice("FOO:") diff --git a/src/dodal/cli.py b/src/dodal/cli.py index baaf96a5023..518f98ed4a5 100644 --- a/src/dodal/cli.py +++ b/src/dodal/cli.py @@ -1,17 +1,15 @@ import importlib import os from collections.abc import Mapping -from pathlib import Path import click from bluesky.run_engine import RunEngine -from ophyd_async.core import NotConnectedError, StaticPathProvider, UUIDFilenameProvider -from ophyd_async.plan_stubs import ensure_connected +from click.exceptions import ClickException +from ophyd_async.core import NotConnectedError from dodal.beamlines import all_beamline_names, module_name_for_beamline -from dodal.common.beamlines.beamline_utils import set_path_provider from dodal.device_manager import DeviceManager -from dodal.utils import AnyDevice, filter_ophyd_devices, make_all_devices +from dodal.utils import AnyDevice from . import __version__ @@ -103,7 +101,7 @@ def connect( # We need to make a RunEngine to allow ophyd-async devices to connect. # See https://blueskyproject.io/ophyd-async/main/explanations/event-loop-choice.html - run_engine = RunEngine(call_returns_result=True) + _run_engine = RunEngine(call_returns_result=True) print(f"Attempting connection to {beamline} (using {full_module_path})") @@ -121,15 +119,9 @@ def connect( timeout=timeout, ) else: - print(f"No device manager named '{device_manager}' found in {mod}") - _spoof_path_provider() - devices, instance_exceptions = make_all_devices( - full_module_path, - include_skipped=all, - fake_with_ophyd_sim=sim_backend, - wait_for_connection=False, + raise ClickException( + f"No device manager named '{device_manager}' found in {mod}" ) - devices, connect_exceptions = _connect_devices(run_engine, devices, sim_backend) # Inform user of successful connections _report_successful_devices(devices, sim_backend) @@ -151,35 +143,3 @@ def _report_successful_devices( print(f"{len(devices)} devices connected{sim_statement}:") print(connected_devices) - - -def _connect_devices( - run_engine: RunEngine, - devices: Mapping[str, AnyDevice], - sim_backend: bool, -) -> tuple[Mapping[str, AnyDevice], Mapping[str, Exception]]: - ophyd_devices, ophyd_async_devices = filter_ophyd_devices(devices) - exceptions = {} - - # Connect ophyd devices - for name, device in ophyd_devices.items(): - try: - device.wait_for_connection() - except Exception as ex: - exceptions[name] = ex - - # Connect ophyd-async devices - try: - run_engine(ensure_connected(*ophyd_async_devices.values(), mock=sim_backend)) - except NotConnectedError as ex: - exceptions = {**exceptions, **ex.sub_errors} - - # Only return the subset of devices that haven't raised an exception - successful_devices = { - name: device for name, device in devices.items() if name not in exceptions - } - return successful_devices, exceptions - - -def _spoof_path_provider() -> None: - set_path_provider(StaticPathProvider(UUIDFilenameProvider(), Path("/tmp"))) diff --git a/src/dodal/common/beamlines/beamline_utils.py b/src/dodal/common/beamlines/beamline_utils.py index 16250e7a915..33447c08927 100644 --- a/src/dodal/common/beamlines/beamline_utils.py +++ b/src/dodal/common/beamlines/beamline_utils.py @@ -1,30 +1,10 @@ -import inspect -from collections.abc import Callable -from typing import Annotated, Final, TypeVar, cast - -from bluesky.run_engine import call_in_bluesky_event_loop from daq_config_server import ConfigClient -from ophyd import Device as OphydV1Device -from ophyd.sim import make_fake_device from ophyd_async.core import ( - DEFAULT_TIMEOUT, PathProvider, ) -from ophyd_async.core import Device as OphydV2Device -from ophyd_async.core import wait_for_connection as v2_device_wait_for_connection from dodal.log import LOGGER -from dodal.utils import ( - AnyDevice, - BeamlinePrefix, - DeviceInitializationController, - SkipType, - skip_device, -) - -DEFAULT_CONNECTION_TIMEOUT: Final[float] = 5.0 -ACTIVE_DEVICES: dict[str, AnyDevice] = {} BL = "" @@ -33,134 +13,6 @@ def set_beamline(beamline: str): BL = beamline -def clear_devices(): - global ACTIVE_DEVICES - for name in list(ACTIVE_DEVICES): - clear_device(name) - - -def clear_device(name: str): - global ACTIVE_DEVICES - device = ACTIVE_DEVICES[name] - if isinstance(device, OphydV1Device): - device.destroy() - del ACTIVE_DEVICES[name] - - -def list_active_devices() -> list[str]: - global ACTIVE_DEVICES - return list(ACTIVE_DEVICES.keys()) - - -def active_device_is_same_type( - active_device: AnyDevice, device: Callable[..., AnyDevice] -) -> bool: - return inspect.isclass(device) and isinstance(active_device, device) - - -def wait_for_connection( - device: AnyDevice, - timeout: float = DEFAULT_CONNECTION_TIMEOUT, - mock: bool = False, -) -> None: - if isinstance(device, OphydV1Device): - device.wait_for_connection(timeout=timeout) - elif isinstance(device, OphydV2Device): - call_in_bluesky_event_loop( - v2_device_wait_for_connection( - coros=device.connect(mock=mock, timeout=timeout) - ), - ) - else: - raise TypeError( - f"Invalid type {device.__class__.__name__} in _wait_for_connection" - ) - - -T = TypeVar("T", bound=AnyDevice) - - -@skip_device() -def device_instantiation( - device_factory: Callable[..., T], - name: str, - prefix: str, - wait: bool, - fake: bool, - post_create: Callable[[T], None] | None = None, - bl_prefix: bool = True, - **kwargs, -) -> T: - """Method to allow generic creation of singleton devices. Meant to be used to easily - define lists of devices in beamline files. Additional keyword arguments are passed - directly to the device constructor. - - Args: - device_factory (Callable): The device class. - name (str): The name for ophyd. - prefix (str): The PV prefix for the most (usually all) components. - wait (bool): Whether to run .wait_for_connection(). - fake (bool): Whether to fake with ophyd.sim. - post_create (Callable): (optional) a function to be run on the device after - creation. - bl_prefix (bool): If true, add the beamline prefix when instantiating. - **kwargs: Arguments passed on to every device factory. - - Returns: - The instance of the device. - """ - already_existing_device: AnyDevice | None = ACTIVE_DEVICES.get(name) - if fake: - device_factory = cast(Callable[..., T], make_fake_device(device_factory)) - if already_existing_device is None: - device_instance = device_factory( - name=name, - prefix=( - f"{(BeamlinePrefix(BL).beamline_prefix)}{prefix}" - if bl_prefix - else prefix - ), - **kwargs, - ) - ACTIVE_DEVICES[name] = device_instance - if wait: - wait_for_connection(device_instance, mock=fake) - - else: - if not active_device_is_same_type(already_existing_device, device_factory): - raise TypeError( - f"Can't instantiate device of type {device_factory} with the same " - f"name as an existing device. Device name '{name}' already used for " - f"a(n) {type(already_existing_device)}." - ) - device_instance = cast(T, already_existing_device) - if post_create: - post_create(device_instance) - return device_instance - - -def device_factory( - *, - use_factory_name: Annotated[bool, "Use factory name as name of device"] = True, - timeout: Annotated[float, "Timeout for connecting to the device"] = DEFAULT_TIMEOUT, - mock: Annotated[bool, "Use Signals with mock backends for device"] = False, - skip: Annotated[ - SkipType, - "mark the factory to be (conditionally) skipped when beamline is imported by external program", - ] = False, -) -> Callable[[Callable[[], T]], DeviceInitializationController[T]]: - def decorator(factory: Callable[[], T]) -> DeviceInitializationController[T]: - return DeviceInitializationController( - factory, - use_factory_name, - timeout, - mock, - skip, - ) - - return decorator - - def set_path_provider(provider: PathProvider): global PATH_PROVIDER diff --git a/src/dodal/device_manager.py b/src/dodal/device_manager.py index 0d4dfd55dd1..5ec5ec46169 100644 --- a/src/dodal/device_manager.py +++ b/src/dodal/device_manager.py @@ -14,19 +14,14 @@ NamedTuple, ParamSpec, Self, + TypeAlias, TypeVar, ) from bluesky.run_engine import get_bluesky_event_loop +from ophyd.device import Device as OphydV1Device from ophyd.sim import make_fake_device - -from dodal.common.beamlines.beamline_utils import wait_for_connection -from dodal.utils import ( - AnyDevice, - OphydV1Device, - OphydV2Device, - SkipType, -) +from ophyd_async.core import Device as OphydV2Device DEFAULT_TIMEOUT = 30 NO_DOCS = "No documentation available." @@ -34,8 +29,11 @@ T = TypeVar("T") Args = ParamSpec("Args") +SkipType = bool | Callable[[], bool] + V1 = TypeVar("V1", bound=OphydV1Device) V2 = TypeVar("V2", bound=OphydV2Device) +AnyDevice: TypeAlias = OphydV1Device | OphydV2Device DeviceFactoryDecorator = Callable[[Callable[Args, V2]], "DeviceFactory[Args, V2]"] OphydInitialiser = Callable[Concatenate[V1, Args], V1 | None] @@ -249,7 +247,7 @@ def __call__(self, dev: V1, *args: Args.args, **kwargs: Args.kwargs): def create(self, *args: Args.args, **kwargs: Args.kwargs) -> V1: device = self.factory(name=self.name, prefix=self.prefix) if self.wait: - wait_for_connection(device, timeout=self.timeout) + device.wait_for_connection(timeout=self.timeout) self.post_create(device, *args, **kwargs) return device diff --git a/src/dodal/plans/save_panda.py b/src/dodal/plans/save_panda.py index 88fc3497f6b..b04a80d77a4 100644 --- a/src/dodal/plans/save_panda.py +++ b/src/dodal/plans/save_panda.py @@ -1,4 +1,5 @@ import argparse +import importlib import os import sys from argparse import ArgumentParser @@ -12,7 +13,7 @@ ) from dodal.beamlines import module_name_for_beamline -from dodal.utils import make_device +from dodal.device_manager import DeviceFactory, DeviceManager def main(argv: list[str] | None = None): @@ -70,19 +71,25 @@ def main(argv: list[str] | None = None): return 0 +def _build_panda(beamline, device_name) -> Device: + print(f"Building {device_name} for beamline {beamline}") + module_name = module_name_for_beamline(beamline) + mod = importlib.import_module("dodal.beamlines." + module_name) + device_manager: DeviceManager = mod.devices + return cast(DeviceFactory, device_manager[device_name]).build( + connect_immediately=True + ) + + def _save_panda(beamline, device_name, output_directory, file_name): run_engine = RunEngine() - print("Creating devices...") - module_name = module_name_for_beamline(beamline) + try: - devices = make_device( - f"dodal.beamlines.{module_name}", device_name, connect_immediately=True - ) + panda = _build_panda(beamline, device_name) except Exception as error: sys.stderr.write(f"Couldn't create device {device_name}: {error}\n") sys.exit(1) - panda = devices[device_name] print( f"Saving to {output_directory}/{file_name} from {device_name} on {beamline}..." ) diff --git a/src/dodal/utils.py b/src/dodal/utils.py index 5d3bae516d0..1994880b7b1 100644 --- a/src/dodal/utils.py +++ b/src/dodal/utils.py @@ -1,24 +1,12 @@ -import functools import importlib -import inspect import os import re import socket import string -from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass -from functools import update_wrapper, wraps -from importlib import import_module -from inspect import signature from os import environ -from types import FunctionType, ModuleType -from typing import ( - Any, - Generic, - TypeAlias, - TypeGuard, - TypeVar, -) +from types import ModuleType +from typing import TypeAlias from bluesky.protocols import ( Checkable, @@ -36,7 +24,6 @@ Triggerable, WritesExternalAssets, ) -from bluesky.run_engine import call_in_bluesky_event_loop from ophyd.device import Device as OphydV1Device from ophyd_async.core import Device as OphydV2Device @@ -61,9 +48,6 @@ ] AnyDevice: TypeAlias = OphydV1Device | OphydV2Device -V1DeviceFactory: TypeAlias = Callable[..., OphydV1Device] -V2DeviceFactory: TypeAlias = Callable[..., OphydV2Device] -AnyDeviceFactory: TypeAlias = V1DeviceFactory | V2DeviceFactory def get_beamline_name(default: str | None = None) -> str: @@ -93,392 +77,6 @@ def __post_init__(self): self.frontend_prefix = f"FE{self.ixx[1:3]}{self.suffix}" -T = TypeVar("T", bound=AnyDevice) - -SkipType = bool | Callable[[], bool] - - -def skip_device(precondition=lambda: True): - def decorator(func: Callable[..., T]) -> Callable[..., T]: - @wraps(func) - def wrapper(*args, **kwds) -> T: - return func(*args, **kwds) - - if precondition(): - wrapper.__skip__ = True # type: ignore - return wrapper - - return decorator - - -class DeviceInitializationController(Generic[T]): - def __init__( - self, - factory: Callable[[], T], - use_factory_name: bool, - timeout: float, - mock: bool, - skip: SkipType, - ): - self._factory: Callable[..., T] = functools.cache(factory) - self._use_factory_name = use_factory_name - self._timeout = timeout - self._mock = mock - self._skip = skip - update_wrapper(self, factory) - - @property - def skip(self) -> bool: - return self._skip() if callable(self._skip) else self._skip - - def cache_clear(self) -> None: - """Clears the controller's internal cached instance of the device, if present. - Noop if not. - """ - # Functools adds the cache_clear function via setattr so the type checker - # does not pick it up. - self._factory.cache_clear() # type: ignore - - def __call__( - self, - connect_immediately: bool = False, - name: str | None = None, - connection_timeout: float | None = None, - mock: bool | None = None, - **kwargs, - ) -> T: - """Returns an instance of the Device the wrapped factory produces: the same - instance will be returned if this method is called multiple times, and arguments - may be passed to override this Controller's configuration. - Once the device is connected, the value of mock must be consistent, or connect - must be False. - - Additional keyword arguments will be passed through to the wrapped factory - function. - - Args: - connect_immediately (bool, default False): Whether to call connect on the - device before returning it- connect is idempotent for ophyd-async - devices. Not connecting to the device allows for the instance to be - created prior to the RunEngine event loop being configured or for - connect to be called lazily e.g. by the `ensure_connected` stub. - name (str, optional): An override name to give the device, which is - also used to name its children. Defaults to None, which does not name - the device unless the device has no name and this Controller is - configured to use_factory_name, which propagates the name of the wrapped - factory function to the device instance. - connection_timeout (float, optional): An override timeout length in - seconds for the connect method, if it is called. Defaults to None, which - defers to the timeout configured for this Controller: the default uses - ophyd_async's DEFAULT_TIMEOUT. - mock (bool, optional): Overrides whether to connect to Mock signal - backends, if connect is called. Defaults to None, which uses the mock - parameter of this Controller. This value must be used consistently when - connect is called on the Device. - **kwargs: Arguments passed on to every factory. - - Returns: - T: A singleton instance of the Device class returned by the wrapped factory. - - Raises: - RuntimeError: If the device factory was invoked again with different - keyword arguments, without previously invoking cache_clear(). - """ - is_v2_device = is_v2_device_factory(self._factory) - is_mock = mock if mock is not None else self._mock - if is_v2_device: - device: T = self._factory(**kwargs) - else: - device: T = self._factory(mock=is_mock, **kwargs) - - if self._factory.cache_info().currsize > 1: # type: ignore - raise RuntimeError( - f"Device factory method called multiple times with different parameters: " - f"{self.__name__}" # type: ignore - ) - - if connect_immediately: - timeout = ( - connection_timeout if connection_timeout is not None else self._timeout - ) - if is_v2_device: - call_in_bluesky_event_loop( - device.connect(timeout=timeout, mock=is_mock) - ) - else: - assert is_v1_device_type(type(device)) - device.wait_for_connection(timeout=timeout) # type: ignore - - if name: - device.set_name(name) - elif not device.name and self._use_factory_name: - device.set_name(self._factory.__name__) - - return device - - -def make_device( - module: str | ModuleType, - device_name: str, - **kwargs, -) -> dict[str, AnyDevice]: - """Make a single named device and its dependencies from the given beamline module. - - Args: - module (str | ModuleType): The module to make devices from. - device_name: Name of the device to construct. - **kwargs: Arguments passed on to every device factory. - - Returns: - dict[str, AnyDevice]: A dict mapping device names to the constructed devices. - """ - if isinstance(module, str): - module = import_module(module) - - device_collector = {} - factories = collect_factories(module) - device_collector[device_name] = _make_one_device( - module, device_name, device_collector, factories, **kwargs - ) - return device_collector - - -def make_all_devices( - module: str | ModuleType | None = None, include_skipped: bool = False, **kwargs -) -> tuple[dict[str, AnyDevice], dict[str, Exception]]: - """Makes all devices in the given beamline module, for those modules using device - factories as opposed to the DeviceManager. - - In cases of device interdependencies it ensures a device is created before any which - depend on it. - - Args: - module (str | ModuleType | None, optional): The module to make devices from. - include_skipped (bool, optional): If True, also load factories with the - @skip_device annotation. Defaults to False. - **kwargs: Arguments passed on to every device. - - Returns: - Tuple[Dict[str, AnyDevice], Dict[str, Exception]]: This represents a tuple - containing two dictionaries: - -A dictionary where the keys are device names and the values are devices. - -A dictionary where the keys are device names and the values are exceptions. - """ - if isinstance(module, str) or module is None: - module = import_module(module or __name__) - factories = collect_factories(module, include_skipped) - devices: tuple[dict[str, AnyDevice], dict[str, Exception]] = invoke_factories( - factories, **kwargs - ) - - return devices - - -def invoke_factories( - factories: Mapping[str, AnyDeviceFactory], - **kwargs, -) -> tuple[dict[str, AnyDevice], dict[str, Exception]]: - """Call device factory functions in the correct order to resolve dependencies. - Inspect function signatures to work out dependencies and execute functions in - correct order. - - If one device takes another as an argument (by name, similar to pytest fixtures) - this will detect a dependency and create and cache the non-dependant device first. - - Args: - factories (Mapping[str, AnyDeviceFactory]): Mapping of function name -> function. - **kwargs: Optional key word arguments: `mock` and `connect_immediately`. - - Returns: - Tuple[Dict[str, AnyDevice], Dict[str, Exception]]: Tuple of two dictionaries: - - One mapping device name to device. - - One mapping device name to exception for any failed devices. - """ - devices: dict[str, AnyDevice] = {} - exceptions: dict[str, Exception] = {} - - # Compute tree of dependencies, - dependencies = { - factory_name: set(extract_dependencies(factories, factory_name)) - for factory_name in factories.keys() - } - while (len(devices) + len(exceptions)) < len(factories): - leaves = [ - device - for device, device_dependencies in dependencies.items() - if (device not in devices and device not in exceptions) - and len(device_dependencies - set(devices.keys())) == 0 - ] - dependent_name = leaves.pop() - params = {name: devices[name] for name in dependencies[dependent_name]} - try: - factory = factories[dependent_name] - if isinstance(factory, DeviceInitializationController): - # For now we translate the old-style parameters that - # device_instantiation expects. Once device_instantiation is gone and - # replaced with DeviceInitializationController we can formalise the - # API of make_all_devices and make these parameters explicit. - # https://github.com/DiamondLightSource/dodal/issues/844 - mock = kwargs.get( - "mock", - kwargs.get( - "fake_with_ophyd_sim", - False, - ), - ) - connect_immediately = kwargs.get( - "connect_immediately", - kwargs.get( - "wait_for_connection", - False, - ), - ) - devices[dependent_name] = factory( - mock=mock, - connect_immediately=connect_immediately, - ) - else: - devices[dependent_name] = factory(**params, **kwargs) - except Exception as e: - exceptions[dependent_name] = e - - all_devices = {device.name: device for device in devices.values()} - - return (all_devices, exceptions) - - -def extract_dependencies( - factories: Mapping[str, AnyDeviceFactory], factory_name: str -) -> Iterable[str]: - """Compute dependencies for a device factory. Dependencies are named in the - factory function signature, similar to pytest fixtures. For example given - def device_one(): and def device_two(device_one: Readable):, indicate that - device_one is a dependency of device_two. - - Args: - factories (Mapping[str, AnyDeviceFactory]): All factories, mapping of - function name -> function - factory_name (str): The name of the factory in factories whose dependencies need - computing - - Returns: - Iterable[str]: Generator of factory names - - Yields: - Iterator[Iterable[str]]: Factory names - """ - for name, param in inspect.signature(factories[factory_name]).parameters.items(): - if param.default is inspect.Parameter.empty and name in factories: - yield name - - -def collect_factories( - module: ModuleType, include_skipped: bool = False -) -> dict[str, AnyDeviceFactory]: - """Automatically detect device factory functions within a module. They are detected - via the return type signature e.g. def my_device() -> ADeviceType: - - Args: - module (ModuleType): The module to inspect - include_skipped (bool, optional): If True, also load factories with the - @skip_device annotation. Defaults to False. - - Returns: - dict[str, AnyDeviceFactory]: Mapping of factory name -> factory. - """ # noqa D415 - factories: dict[str, AnyDeviceFactory] = {} - - for var in module.__dict__.values(): - if ( - callable(var) - and is_any_device_factory(var) - and (include_skipped or not _is_device_skipped(var)) - ): - factories[var.__name__] = var - return factories - - -def _is_device_skipped(func: AnyDeviceFactory) -> bool: - if isinstance(func, DeviceInitializationController): - return func.skip - return getattr(func, "__skip__", False) - - -def is_v1_device_factory(func: Callable) -> TypeGuard[V1DeviceFactory]: - try: - return_type = signature(func).return_annotation - return is_v1_device_type(return_type) - except ValueError: - return False - - -def is_v2_device_factory(func: Callable) -> TypeGuard[V2DeviceFactory]: - try: - return_type = signature(func).return_annotation - return is_v2_device_type(return_type) - except ValueError: - return False - - -def is_any_device_factory(func: Callable) -> TypeGuard[AnyDeviceFactory]: - return isinstance(func, FunctionType | DeviceInitializationController) and ( - is_v1_device_factory(func) or is_v2_device_factory(func) - ) - - -def is_v2_device_type(obj: type[Any]) -> bool: - non_parameterized_class = None - if obj != inspect.Signature.empty: - if inspect.isclass(obj): - non_parameterized_class = obj - elif hasattr(obj, "__origin__"): - # typing._GenericAlias is the same as types.GenericAlias, maybe? - # This is all very badly documented and possibly prone to change in future versions of Python - non_parameterized_class = obj.__origin__ - if non_parameterized_class: - return non_parameterized_class and issubclass( - non_parameterized_class, OphydV2Device - ) - - return False - - -def is_v1_device_type(obj: type[Any]) -> bool: - is_class = inspect.isclass(obj) - follows_protocols = any(isinstance(obj, protocol) for protocol in BLUESKY_PROTOCOLS) - return is_class and follows_protocols and not is_v2_device_type(obj) - - -def filter_ophyd_devices( - devices: Mapping[str, AnyDevice], -) -> tuple[Mapping[str, OphydV1Device], Mapping[str, OphydV2Device]]: - """Split a dictionary of ophyd and ophyd-async devices (i.e. the output of - make_all_devices) into 2 separate dictionaries of the different types. Useful when - special handling is needed for each type of device. - - Args: - devices (Mapping[str, AnyDevice]): Dictionary of device name to ophyd or - ophyd-async device. - - Raises: - ValueError: If anything in the dictionary doesn't come from either library. - - Returns: - Tuple of two dictionaries, one mapping names to ophyd devices and one mapping - names to ophyd-async devices. - """ - ophyd_devices = {} - ophyd_async_devices = {} - for name, device in devices.items(): - if isinstance(device, OphydV1Device): - ophyd_devices[name] = device - elif isinstance(device, OphydV2Device): - ophyd_async_devices[name] = device - else: - raise ValueError(f"{name}: {device} is not an ophyd or ophyd-async device") - return ophyd_devices, ophyd_async_devices - - def get_beamline_based_on_environment_variable() -> ModuleType: """Gets the dodal module for the current beamline, as specified by the BEAMLINE environment variable. @@ -542,30 +140,3 @@ def get_run_number(directory: str, prefix: str = "") -> int: return 1 else: return _find_next_run_number_from_files(nexus_file_names) - - -def _make_one_device( - module: ModuleType, - device_name: str, - devices: dict[str, AnyDevice], - factories: dict[str, AnyDeviceFactory], - **kwargs, -) -> AnyDevice: - factory = factories.get(device_name) - if not factory: - raise ValueError(f"Unable to find factory for {device_name}") - - dependencies = list(extract_dependencies(factories, device_name)) - for dependency_name in dependencies: - if dependency_name not in devices: - try: - devices[dependency_name] = _make_one_device( - module, dependency_name, devices, factories, **kwargs - ) - except Exception as e: - raise RuntimeError( - f"Unable to construct device {dependency_name}" - ) from e - - params = {name: devices[name] for name in dependencies} - return factory(**params, **kwargs) diff --git a/tests/common/beamlines/test_beamline_utils.py b/tests/common/beamlines/test_beamline_utils.py deleted file mode 100644 index ec58f066776..00000000000 --- a/tests/common/beamlines/test_beamline_utils.py +++ /dev/null @@ -1,232 +0,0 @@ -import asyncio -import functools -from unittest.mock import ANY, AsyncMock, MagicMock, patch - -import pytest -from ophyd import Device -from ophyd.device import Device as OphydV1Device -from ophyd.sim import FakeEpicsSignal -from ophyd_async.core import Device as OphydV2Device -from ophyd_async.core import StandardReadable - -from dodal.common.beamlines import beamline_utils -from dodal.devices.eiger import EigerDetector -from dodal.devices.focusing_mirror import FocusingMirror -from dodal.devices.motors import XYZStage -from dodal.devices.smargon import Smargon -from dodal.log import LOGGER -from dodal.utils import DeviceInitializationController - - -@pytest.fixture(autouse=True) -def i03_beamline(): - with patch("dodal.common.beamlines.beamline_utils.BL", "i03") as bl: - yield bl - - -@pytest.fixture(autouse=True) -def flush_event_loop_on_finish(): - event_loop = asyncio.get_event_loop() - # wait for the test function to complete - yield None - - if pending_tasks := asyncio.all_tasks(event_loop): - LOGGER.warning(f"Waiting for pending tasks to complete {pending_tasks}") - event_loop.run_until_complete(asyncio.gather(*pending_tasks)) - - -def test_instantiate_function_makes_supplied_device(): - device_types = [XYZStage, Smargon] - for device in device_types: - dev = beamline_utils.device_instantiation( - device, device.__name__, "", False, True, None - ) - assert isinstance(dev, device) - - -def test_instantiating_different_device_with_same_name(): - dev1 = beamline_utils.device_instantiation( # noqa - XYZStage, "device", "", False, True, None - ) - with pytest.raises(TypeError): - dev2 = beamline_utils.device_instantiation( - Smargon, "device", "", False, True, None - ) - beamline_utils.clear_device("device") - dev2 = beamline_utils.device_instantiation( # noqa - Smargon, "device", "", False, True, None - ) - assert dev1.name == dev2.name - assert type(dev1) is not type(dev2) - assert dev1 not in beamline_utils.ACTIVE_DEVICES.values() - assert dev2 in beamline_utils.ACTIVE_DEVICES.values() - - -def test_instantiate_v1_function_fake_makes_fake(): - eiger: EigerDetector = beamline_utils.device_instantiation( - EigerDetector, - "eiger", - "", - True, - True, - None, - beamline="test", - ispyb_detector_id=0, - ) - assert isinstance(eiger, Device) - assert isinstance(eiger.stale_params, FakeEpicsSignal) - - -def test_instantiate_v2_function_fake_makes_fake(): - fake_smargon: Smargon = beamline_utils.device_instantiation( - Smargon, "smargon", "", True, True, None - ) - assert isinstance(fake_smargon, StandardReadable) - assert fake_smargon.omega.user_setpoint.source.startswith("mock+ca") - - -@pytest.mark.parametrize( - "kwargs,expected_timeout", [({}, 5.0), ({"timeout": 15.0}, 15.0)] -) -def test_wait_for_v1_device_connection_passes_through_timeout(kwargs, expected_timeout): - device = OphydV1Device(name="") - device.wait_for_connection = MagicMock() - - beamline_utils.wait_for_connection(device, **kwargs) - - device.wait_for_connection.assert_called_once_with(timeout=expected_timeout) - - -@pytest.mark.parametrize( - "kwargs,expected_timeout", [({}, 5.0), ({"timeout": 15.0}, 15.0)] -) -@patch( - "dodal.common.beamlines.beamline_utils.v2_device_wait_for_connection", - new=AsyncMock(), -) -def test_wait_for_v2_device_connection_passes_through_timeout(kwargs, expected_timeout): - device = OphydV2Device() - device.connect = MagicMock() - - beamline_utils.wait_for_connection(device, **kwargs) - - device.connect.assert_called_once_with( - mock=ANY, - timeout=expected_timeout, - ) - - -def dummy_mirror() -> FocusingMirror: - mirror = MagicMock(spec=FocusingMirror) - connect = AsyncMock() - mirror.connect = connect - - def set_name(name: str): - mirror.name = name # type: ignore - - mirror.set_name.side_effect = set_name - mirror.set_name("") - return mirror - - -@beamline_utils.device_factory(mock=True) -def dummy_mirror_as_device_factory() -> FocusingMirror: - return dummy_mirror() - - -@beamline_utils.device_factory(mock=True) -@functools.lru_cache -def cached_dummy_mirror_as_device_factory() -> FocusingMirror: - return dummy_mirror() - - -def test_device_controller_name_propagated(): - mirror = dummy_mirror_as_device_factory(name="foo") - assert mirror.name == "foo" - - -def test_device_controller_connection_is_lazy(): - mirror = dummy_mirror_as_device_factory(name="foo") - assert mirror.connect.call_count == 0 # type: ignore - - -def test_device_controller_eager_connect(): - mirror = dummy_mirror_as_device_factory(connect_immediately=True) - assert mirror.connect.call_count == 1 # type: ignore - - -@pytest.mark.parametrize( - "factory", - [ - dummy_mirror_as_device_factory, - # The second test case confirms that if, for some reason, we use a device - # factory decorated with @lru_cache, dodal is not affected and will still cache - # the same device instance internally. We actually also use lru_cache - # internally so this test case is just a sanity check to prove it is - # idempotent. - cached_dummy_mirror_as_device_factory, - ], -) -def test_device_cached(factory: DeviceInitializationController): - mirror_1 = factory() - mirror_2 = factory() - assert mirror_1 is mirror_2 - - -def test_device_cache_can_be_cleared(): - mirror_1 = dummy_mirror_as_device_factory() - dummy_mirror_as_device_factory.cache_clear() - - mirror_2 = dummy_mirror_as_device_factory() - assert mirror_1 is not mirror_2 - - -def test_skip(): - skip = True - - def _skip() -> bool: - return skip - - controller = beamline_utils.device_factory(skip=_skip)(dummy_mirror) - - assert isinstance(controller, DeviceInitializationController) - assert controller.skip - - skip = False - assert not controller.skip - - -def test_clear_devices_destroys_ophyd_v1_devices(): - dev1 = beamline_utils.device_instantiation( - EigerDetector, - "eiger", - "", - True, - True, - None, - beamline="test", - ispyb_detector_id=0, - ) - dev1.destroy = MagicMock() - - beamline_utils.clear_devices() - - dev1.destroy.assert_called_once() - - -def test_clear_device_destroys_ophyd_v1_device(): - dev1 = beamline_utils.device_instantiation( - EigerDetector, - "eiger", - "", - True, - True, - None, - beamline="test", - ispyb_detector_id=0, - ) - dev1.destroy = MagicMock() - - beamline_utils.clear_device("eiger") - - dev1.destroy.assert_called_once() diff --git a/tests/common/beamlines/test_device_instantiation.py b/tests/common/beamlines/test_device_instantiation.py index e9ce27fd437..83daa384b1c 100644 --- a/tests/common/beamlines/test_device_instantiation.py +++ b/tests/common/beamlines/test_device_instantiation.py @@ -6,8 +6,7 @@ from dodal.beamlines import all_beamline_modules from dodal.common.beamlines.beamline_utils import clear_config_client, set_config_client -from dodal.device_manager import DeviceManager -from dodal.utils import BLUESKY_PROTOCOLS, make_all_devices +from dodal.utils import BLUESKY_PROTOCOLS from tests.test_data import I04_BEAMLINE_PARAMETERS, TEST_BEAMLINE_PARAMETERS_TXT @@ -54,34 +53,3 @@ def test_device_creation(module_and_devices_for_beamline): assert len(devices_not_following_bluesky_protocols) == 0, ( f"{devices_not_following_bluesky_protocols} do not follow bluesky protocols" ) - - -@pytest.mark.parametrize( - "module_and_devices_for_beamline", - set(all_beamline_modules()), - indirect=True, -) -def test_devices_are_identical(module_and_devices_for_beamline): - """Ensures that for every beamline all device functions prevent duplicate - instantiation. - """ - bl_mod, devices_a, _ = module_and_devices_for_beamline - if isinstance(getattr(bl_mod, "devices", None), DeviceManager): - # DeviceManager beamline modules do not cache device instances - return - - devices_b, _ = make_all_devices( - bl_mod, - include_skipped=True, - fake_with_ophyd_sim=True, - ) - non_identical_names = [ - device_name - for device_name, device in devices_a.items() - if device is not devices_b[device_name] - ] - total_number_of_devices = len(devices_a) - non_identical_number_of_devices = len(devices_a) - assert len(non_identical_names) == 0, ( - f"{non_identical_number_of_devices}/{total_number_of_devices} devices were not identical: {non_identical_names}" - ) diff --git a/tests/conftest.py b/tests/conftest.py index 6c058e2ea3a..8f48d948f03 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ import importlib import logging import os -import sys from os import environ from pathlib import Path from types import ModuleType @@ -10,7 +9,7 @@ import pytest from ophyd_async.core import PathProvider -from dodal.common.beamlines import beamline_parameters, beamline_utils +from dodal.common.beamlines import beamline_parameters from dodal.common.beamlines.beamline_utils import ( clear_config_client, clear_path_provider, @@ -24,11 +23,6 @@ from dodal.devices.detector import DetectorParams from dodal.devices.detector.det_dim_constants import EIGER2_X_16M_SIZE from dodal.log import LOGGER, GELFTCPHandler, set_up_all_logging_handlers -from dodal.utils import ( - DeviceInitializationController, - collect_factories, - make_all_devices, -) from tests.devices.beamlines.i10.test_data import LOOKUP_TABLE_PATH from tests.devices.oav.test_data import TEST_DISPLAY_CONFIG, TEST_OAV_ZOOM_LEVELS from tests.devices.test_daq_configuration import MOCK_DAQ_CONFIG_PATH @@ -89,7 +83,6 @@ def patched_open(*args, **kwargs): def pytest_runtest_setup(item): - beamline_utils.clear_devices() if LOGGER.handlers == []: mock_graylog_handler = MagicMock(spec=GELFTCPHandler) mock_graylog_handler.return_value.level = logging.DEBUG @@ -100,11 +93,6 @@ def pytest_runtest_setup(item): ) -def pytest_runtest_teardown(): - if "dodal.beamlines.beamline_utils" in sys.modules: - sys.modules["dodal.beamlines.beamline_utils"].clear_devices() - - @pytest.fixture def dummy_visit_client() -> DirectoryServiceClient: return LocalDirectoryServiceClient() @@ -135,17 +123,7 @@ def module_and_devices_for_beamline(request: pytest.FixtureRequest): result.devices, result.connection_errors | result.build_errors, ) - else: - devices, exceptions = make_all_devices( - bl_mod, - include_skipped=True, - fake_with_ophyd_sim=True, - ) - yield (bl_mod, devices, exceptions) - beamline_utils.clear_devices() - for factory in collect_factories(bl_mod).values(): - if isinstance(factory, DeviceInitializationController): - factory.cache_clear() + yield (bl_mod, devices, exceptions) del bl_mod diff --git a/tests/fake_beamline.py b/tests/fake_beamline.py deleted file mode 100644 index 5bb66245bbe..00000000000 --- a/tests/fake_beamline.py +++ /dev/null @@ -1,38 +0,0 @@ -from unittest.mock import MagicMock - -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor - -from dodal.devices.cryostream import OxfordCryoStream -from dodal.devices.diamond_filter import DiamondFilter, I03Filters -from dodal.utils import OphydV2Device - - -def device_a() -> Readable: - return _mock_with_name("readable") - - -def device_b() -> Motor: - return _mock_with_name("motor") - - -def device_c() -> OxfordCryoStream: - return _mock_with_name("cryo") - - -def generic_device_d() -> DiamondFilter[I03Filters]: - return _mock_with_name("diamond_filter") - - -def plain_ophyd_v2_device() -> OphydV2Device: - return _mock_with_name("ophyd_v2_device") - - -def not_device() -> int: - return 5 - - -def _mock_with_name(name: str) -> MagicMock: - mock = MagicMock() - mock.name = name - return mock diff --git a/tests/fake_beamline_broken_dependency.py b/tests/fake_beamline_broken_dependency.py deleted file mode 100644 index 438438067a0..00000000000 --- a/tests/fake_beamline_broken_dependency.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest.mock import MagicMock - -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor - -from dodal.devices.cryostream import OxfordCryoStream - - -def device_x() -> Readable: - return _mock_with_name("readable") - - -def device_y() -> Motor: - raise AssertionError("Test failure") - - -def device_z(device_x: Readable, device_y: Motor) -> OxfordCryoStream: - return _mock_with_name("cryo") - - -def _mock_with_name(name: str) -> MagicMock: - mock = MagicMock() - mock.name = name - return mock diff --git a/tests/fake_beamline_dependencies.py b/tests/fake_beamline_dependencies.py deleted file mode 100644 index a48c8a4ffdb..00000000000 --- a/tests/fake_beamline_dependencies.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest.mock import MagicMock - -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor - -from dodal.devices.cryostream import OxfordCryoStream - - -def device_x() -> Readable: - return _mock_with_name("readable") - - -def device_y() -> Motor: - return _mock_with_name("motor") - - -def device_z(device_x: Readable, device_y: Motor) -> OxfordCryoStream: - return _mock_with_name("cryo") - - -def _mock_with_name(name: str) -> MagicMock: - mock = MagicMock() - mock.name = name - return mock diff --git a/tests/fake_beamline_disordered_dependencies.py b/tests/fake_beamline_disordered_dependencies.py deleted file mode 100644 index 0520766734e..00000000000 --- a/tests/fake_beamline_disordered_dependencies.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest.mock import MagicMock - -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor - -from dodal.devices.cryostream import OxfordCryoStream - - -def device_z(device_x: Readable, device_y: Motor) -> OxfordCryoStream: - return _mock_with_name("cryo") - - -def device_x() -> Readable: - return _mock_with_name("readable") - - -def device_y() -> Motor: - return _mock_with_name("motor") - - -def _mock_with_name(name: str) -> MagicMock: - mock = MagicMock() - mock.name = name - return mock diff --git a/tests/fake_beamline_misbehaving_builtins.py b/tests/fake_beamline_misbehaving_builtins.py deleted file mode 100644 index 6edaee1a688..00000000000 --- a/tests/fake_beamline_misbehaving_builtins.py +++ /dev/null @@ -1,42 +0,0 @@ -from math import hypot, log -from typing import TypedDict - -from ophyd.utils import DisconnectedError - -"""Some builtins (e.g. dict, Exception), types that extend -them, and aliases for them, do not have signature information. -PEP-8 recommends being conservative with adding typing information -to builtins and the core Python library, so this may change slowly. -This beamline uses some types or constructions that are known to -cause issue but that could conceivably be used in a beamline file. - - - Importing specific exceptions - - Importing functions from builtins, including math - - Aliasing builtins, including dict - - Defining a class that extends TypedDict (e.g. for parameters) - -""" - - -def not_a_device() -> None: - """Importing DisconnectedError is enough to cause issue, but we - use it here to prevent linting from removing it from the imports. - """ - raise DisconnectedError() - - -def also_not_a_device() -> float: - """Log and hypot both do not have signatures. - Not required to actually be used, importing is enough. - """ - return log(hypot(0, 0)) - - -a = dict -b = Exception - - -class B(TypedDict): - """Causes issue only if a class that extends TypedDict exists.""" - - foo: int diff --git a/tests/fake_beamline_some_devices_working.py b/tests/fake_beamline_some_devices_working.py deleted file mode 100644 index 225b39a4aac..00000000000 --- a/tests/fake_beamline_some_devices_working.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest.mock import MagicMock - -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor - -from dodal.devices.undulator import UndulatorInKeV - - -def device_a() -> Readable: - return _mock_with_name("readable") - - -def device_b() -> Motor: - raise TimeoutError - - -def device_c() -> UndulatorInKeV: - return _mock_with_name("undulator") - - -def _mock_with_name(name: str) -> MagicMock: - mock = MagicMock() - mock.name = name - return mock diff --git a/tests/fake_device_factory_beamline.py b/tests/fake_device_factory_beamline.py deleted file mode 100644 index 4014644ba9b..00000000000 --- a/tests/fake_device_factory_beamline.py +++ /dev/null @@ -1,46 +0,0 @@ -from unittest.mock import AsyncMock, MagicMock - -import ophyd -from bluesky.protocols import Readable, Reading, SyncOrAsync -from event_model.documents.event_descriptor import DataKey -from ophyd_async.core import Device - -from dodal.common.beamlines.beamline_utils import device_factory, device_instantiation -from dodal.devices.cryostream import OxfordCryoStream - - -class ReadableDevice(Readable, Device): - def read(self) -> SyncOrAsync[dict[str, Reading]]: - return {} - - def describe(self) -> SyncOrAsync[dict[str, DataKey]]: - return {} - - -@device_factory(skip=True) -def device_a() -> ReadableDevice: - return ReadableDevice("readable") - - -@device_factory(skip=lambda: True) -def device_c() -> OxfordCryoStream: - return OxfordCryoStream("FOO:") - - -@device_factory(skip=True) -def mock_device(**kwargs) -> ReadableDevice: - device = MagicMock() - device.name = "mock_device" - device.connect = AsyncMock() - device.my_kwargs = kwargs - return device # type: ignore - - -@device_factory(skip=True) -def ophyd_v1_device(mock: bool = False, **kwargs) -> ophyd.Device: - device = device_instantiation( - ophyd.Device, "my_v1_device", "my_prefix", False, mock - ) - device.wait_for_connection = MagicMock() - device.my_kwargs = kwargs - return device diff --git a/tests/plans/test_save_panda.py b/tests/plans/test_save_panda.py index 48d1245f55a..4edf9fb6b48 100644 --- a/tests/plans/test_save_panda.py +++ b/tests/plans/test_save_panda.py @@ -4,7 +4,8 @@ import pytest from bluesky import RunEngine -from dodal.plans.save_panda import _save_panda, main +from dodal.device_manager import DeviceManager +from dodal.plans.save_panda import _build_panda, _save_panda, main @pytest.fixture(autouse=True) @@ -13,14 +14,31 @@ def patch_run_engine_in_save_panda_to_avoid_leaks(run_engine: RunEngine): yield +def test_build_panda(sim_run_engine): + panda = MagicMock() + panda_factory = MagicMock() + panda_factory.build.return_value = panda + + dev_man = MagicMock(spec=DeviceManager) + dev_man.__getitem__.side_effect = {"panda": panda_factory}.get + + with patch( + "dodal.plans.save_panda.importlib.import_module", + return_value=MagicMock(devices=dev_man), + ) as imp: + built_panda = _build_panda("i03", "panda") + imp.assert_called_once_with("dodal.beamlines.i03") + panda_factory.build.assert_called_once_with(connect_immediately=True) + assert built_panda is panda + + def test_save_panda(sim_run_engine): panda = MagicMock() + directory = "test" filename = "file.yml" with ( - patch( - "dodal.plans.save_panda.make_device", return_value={"panda": panda} - ) as mock_make_device, + patch("dodal.plans.save_panda._build_panda", return_value=panda) as build, patch( "dodal.plans.save_panda.RunEngine", return_value=MagicMock(side_effect=sim_run_engine.simulate_plan), @@ -30,9 +48,7 @@ def test_save_panda(sim_run_engine): ): _save_panda("i03", "panda", directory, filename) - mock_make_device.assert_called_with( - "dodal.beamlines.i03", "panda", connect_immediately=True - ) + build.assert_called_once_with("i03", "panda") mock_store_settings.assert_called_with( mock_settings_provider(), "file.yml", @@ -46,7 +62,7 @@ def test_save_panda(sim_run_engine): ) def test_save_panda_failure_to_create_device_exits_with_failure_code(mock_exit, tmpdir): with patch( - "dodal.plans.save_panda.make_device", + "dodal.plans.save_panda._build_panda", side_effect=ValueError("device does not exist"), ): with pytest.raises(AssertionError): diff --git a/tests/test_cli.py b/tests/test_cli.py index b2d492093ed..77f475a5b70 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -192,16 +192,11 @@ def test_cli_connect_when_devices_error( @patch("dodal.cli.importlib") -@patch("dodal.cli.make_all_devices") -@patch("dodal.cli._connect_devices") @patch.dict(os.environ, clear=True) -def test_missing_device_manager(connect, make, imp, runner: CliRunner): - # If the device manager cannot be found, it should fall back to the - # make_all_devices + _connect_devices approach. - make.return_value = ({}, {}) - runner.invoke(main, ["connect", "-n", "devices", "i22"]) - make.assert_called_once() - connect.assert_called_once() +def test_missing_device_manager(imp, runner: CliRunner): + res = runner.invoke(main, ["connect", "-n", "devices", "i22"]) + assert res.exit_code > 0 + assert "No device manager named 'devices' found in " in res.output @patch.dict(os.environ, clear=True) diff --git a/tests/test_device_manager.py b/tests/test_device_manager.py index e22b7e59ff4..e8f0afa7466 100644 --- a/tests/test_device_manager.py +++ b/tests/test_device_manager.py @@ -605,13 +605,12 @@ def test_v1_device_factory(dm: DeviceManager): def foo(_): pass - with patch("dodal.device_manager.wait_for_connection") as wfc: - devices = dm.build_all() + devices = dm.build_all() s1.assert_called_once_with(name="foo", prefix="S1_PREFIX") device = s1(name="foo", prefix="S1_PREFIX") assert devices.devices["foo"] is device - wfc.assert_called_once_with(device, timeout=DEFAULT_TIMEOUT) + s1().wait_for_connection.assert_called_once_with(timeout=DEFAULT_TIMEOUT) def test_v1_v2_name_clash(dm: DeviceManager): @@ -651,9 +650,8 @@ def test_v1_no_wait(dm: DeviceManager): def foo(_): pass - with patch("dodal.device_manager.wait_for_connection") as wfc: - foo.build() - wfc.assert_not_called() + foo.build() + s1().wait_for_connection.assert_not_called() def test_connect_ignores_v1(): diff --git a/tests/test_utils.py b/tests/test_utils.py index d4877ab2a4b..a289703c41b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,115 +1,16 @@ import os -from collections.abc import Iterable, Mapping -from shutil import copytree -from typing import Any, cast -from unittest.mock import ANY, MagicMock, Mock, patch +from unittest.mock import MagicMock, patch import pytest -from bluesky.protocols import Readable -from ophyd_async.epics.motor import Motor from dodal.beamlines import i04, i23 -from dodal.devices.diamond_filter import DiamondFilter, I03Filters from dodal.utils import ( - AnyDevice, - OphydV1Device, - OphydV2Device, _find_next_run_number_from_files, - collect_factories, - filter_ophyd_devices, get_beamline_based_on_environment_variable, get_beamline_name, get_hostname, get_run_number, - is_v2_device_type, - make_all_devices, - make_device, ) -from tests.devices.test_daq_configuration import MOCK_DAQ_CONFIG_PATH - - -@pytest.fixture() -def alternate_config(tmp_path) -> str: - """Alternate config dir as MOCK_DAQ_CONFIG_PATH replaces i03.DAQ_CONFIGURATION_PATH - in conftest.py. - """ - alt_config_path = tmp_path / "alt_daq_configuration" - copytree(MOCK_DAQ_CONFIG_PATH, alt_config_path) - return str(alt_config_path) - - -@pytest.fixture() -def fake_device_factory_beamline(): - import tests.fake_device_factory_beamline as beamline - - factories = [ - f - for f in collect_factories(beamline, include_skipped=True).values() - if hasattr(f, "cache_clear") - ] - yield beamline - for f in factories: - f.cache_clear() # type: ignore - - -def test_finds_device_factories() -> None: - import tests.fake_beamline as fake_beamline - - factories = collect_factories(fake_beamline) - - from tests.fake_beamline import ( - device_a, - device_b, - device_c, - generic_device_d, - plain_ophyd_v2_device, - ) - - assert { - "device_a": device_a, - "device_b": device_b, - "device_c": device_c, - "plain_ophyd_v2_device": plain_ophyd_v2_device, - "generic_device_d": generic_device_d, - } == factories - - -def test_makes_devices() -> None: - import tests.fake_beamline as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert { - "readable", - "motor", - "cryo", - "diamond_filter", - "ophyd_v2_device", - } == devices.keys() and len(exceptions) == 0 - - -def test_makes_devices_with_dependencies() -> None: - import tests.fake_beamline_dependencies as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert {"readable", "motor", "cryo"} == devices.keys() and len(exceptions) == 0 - - -def test_makes_devices_with_disordered_dependencies() -> None: - import tests.fake_beamline_disordered_dependencies as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert {"readable", "motor", "cryo"} == devices.keys() and len(exceptions) == 0 - - -def test_makes_devices_with_module_name() -> None: - devices, exceptions = make_all_devices("tests.fake_beamline") - assert { - "readable", - "motor", - "cryo", - "diamond_filter", - "ophyd_v2_device", - } == devices.keys() and len(exceptions) == 0 def test_get_hostname() -> None: @@ -118,195 +19,6 @@ def test_get_hostname() -> None: assert get_hostname() == "a" -def test_no_signature_builtins_not_devices() -> None: - import tests.fake_beamline_misbehaving_builtins as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert len(devices) == 0 - assert len(exceptions) == 0 - - -def test_no_devices_when_all_factories_raise_exceptions() -> None: - import tests.fake_beamline_all_devices_raise_exception as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert len(devices) == 0 - assert len(exceptions) == 3 and all( - isinstance(e, Exception) for e in exceptions.values() - ) - - -def test_some_devices_when_some_factories_raise_exceptions() -> None: - import tests.fake_beamline_some_devices_working as fake_beamline - - devices, exceptions = make_all_devices(fake_beamline) - assert len(devices) == 2 - assert len(exceptions) == 1 and all( - isinstance(e, Exception) for e in exceptions.values() - ) - - -def test_make_device_with_dependency(): - import tests.fake_beamline_dependencies as fake_beamline - - devices = make_device(fake_beamline, "device_z") - assert devices.keys() == {"device_x", "device_y", "device_z"} - - -def test_make_device_no_dependency(): - import tests.fake_beamline_dependencies as fake_beamline - - devices = make_device(fake_beamline, "device_x") - assert devices.keys() == {"device_x"} - - -def test_make_device_with_exception(): - import tests.fake_beamline_all_devices_raise_exception as fake_beamline - - with pytest.raises(ValueError): - make_device(fake_beamline, "device_c") - - -def test_make_device_with_module_name(): - devices = make_device("tests.fake_beamline", "device_a") - assert {"device_a"} == devices.keys() - - -def test_make_device_no_factory(): - import tests.fake_beamline_dependencies as fake_beamline - - with pytest.raises(ValueError): - make_device(fake_beamline, "this_device_does_not_exist") - - -def test_make_device_dependency_throws(): - import tests.fake_beamline_broken_dependency as fake_beamline - - with pytest.raises(RuntimeError): - make_device(fake_beamline, "device_z") - - -def test_device_factory_skips(fake_device_factory_beamline): - devices, exceptions = make_all_devices(fake_device_factory_beamline) - assert len(devices) == 0 - assert len(exceptions) == 0 - - -def test_device_factory_can_ignore_skip(fake_device_factory_beamline): - devices, exceptions = make_all_devices( - fake_device_factory_beamline, include_skipped=True - ) - assert len(devices) == 4 - assert len(exceptions) == 0 - - -def test_device_factory_can_construct_ophyd_v1_devices(fake_device_factory_beamline): - device = fake_device_factory_beamline.ophyd_v1_device( - connect_immediately=True, mock=True, connection_timeout=4.5 - ) - - device.wait_for_connection.assert_called_once_with(timeout=4.5) # type: ignore - - -def test_device_factory_passes_kwargs_to_wrapped_factory_v1( - fake_device_factory_beamline, -): - device = fake_device_factory_beamline.ophyd_v1_device( - connect_immediately=True, - mock=True, - my_int_kwarg=123, - my_str_kwarg="abc", - my_float_kwarg=1.23, - ) - - assert device.my_kwargs == { - "my_int_kwarg": 123, - "my_str_kwarg": "abc", - "my_float_kwarg": 1.23, - } - - -def test_device_factory_passes_kwargs_to_wrapped_factory_v2( - fake_device_factory_beamline, -): - device = fake_device_factory_beamline.mock_device( - connect_immediately=True, - mock=True, - my_int_kwarg=123, - my_str_kwarg="abc", - my_float_kwarg=1.23, - ) - - assert device.my_kwargs == { # type: ignore - "my_int_kwarg": 123, - "my_str_kwarg": "abc", - "my_float_kwarg": 1.23, - } - - -def test_fake_with_ophyd_sim_passed_to_device_factory(fake_device_factory_beamline): - fake_device_factory_beamline.mock_device.cache_clear() - - devices, exceptions = make_all_devices( - fake_device_factory_beamline, - include_skipped=True, - fake_with_ophyd_sim=True, - connect_immediately=True, - ) - if "mock_device" in exceptions: - raise exceptions["mock_device"] - mock_device = cast(Mock, devices["mock_device"]) - mock_device.connect.assert_called_once_with(timeout=ANY, mock=True) - - -def test_mock_passed_to_device_factory(fake_device_factory_beamline): - fake_device_factory_beamline.mock_device.cache_clear() - - devices, exceptions = make_all_devices( - fake_device_factory_beamline, - include_skipped=True, - mock=True, - connect_immediately=True, - ) - if "mock_device" in exceptions: - raise exceptions["mock_device"] - mock_device = cast(Mock, devices["mock_device"]) - mock_device.connect.assert_called_once_with(timeout=ANY, mock=True) - - -def test_connect_immediately_passed_to_device_factory(fake_device_factory_beamline): - fake_device_factory_beamline.mock_device.cache_clear() - - devices, exceptions = make_all_devices( - fake_device_factory_beamline, - include_skipped=True, - connect_immediately=False, - ) - if "mock_device" in exceptions: - raise exceptions["mock_device"] - mock_device = cast(Mock, devices["mock_device"]) - mock_device.connect.assert_not_called() - - -def test_device_factory_can_rename(fake_device_factory_beamline): - cryo = fake_device_factory_beamline.device_c(mock=True, connect_immediately=True) - assert cryo.name == "device_c" - assert cryo.temp.name == "device_c-temp" - - cryo_2 = fake_device_factory_beamline.device_c(name="cryo") - assert cryo is cryo_2 - assert cryo_2.name == "cryo" - assert cryo_2.temp.name == "cryo-temp" - - -def device_a() -> Readable: - return MagicMock() - - -def device_b() -> Motor: - return MagicMock() - - @pytest.mark.parametrize("bl", ["", "$%^&*", "nonexistent"]) def test_invalid_beamline_variable_causes_get_device_module_to_raise(bl): with patch.dict(os.environ, {"BEAMLINE": bl}), pytest.raises(ValueError): @@ -383,110 +95,6 @@ def test_get_run_number_uses_prefix(mock_list_dir: MagicMock): assert get_run_number("dir", "qux") == 1 -OPHYD_DEVICE_A = OphydV1Device(prefix="FOO", name="OPHYD_DEVICE_A") -OPHYD_DEVICE_B = OphydV1Device(prefix="BAR", name="OPHYD_DEVICE_B") - -OPHYD_ASYNC_DEVICE_A = OphydV2Device(name="OPHYD_ASYNC_DEVICE_A") -OPHYD_ASYNC_DEVICE_B = OphydV2Device(name="OPHYD_ASYNC_DEVICE_B") - - -def _filtering_test_cases() -> Iterable[ - tuple[ - Mapping[str, AnyDevice], - Mapping[str, OphydV1Device], - Mapping[str, OphydV2Device], - ] -]: - yield {}, {}, {} - yield ( - {"oa": OPHYD_DEVICE_A}, - {"oa": OPHYD_DEVICE_A}, - {}, - ) - yield ( - {"aa": OPHYD_ASYNC_DEVICE_A}, - {}, - {"aa": OPHYD_ASYNC_DEVICE_A}, - ) - yield ( - {"oa": OPHYD_DEVICE_A, "ob": OPHYD_DEVICE_B}, - {"oa": OPHYD_DEVICE_A, "ob": OPHYD_DEVICE_B}, - {}, - ) - yield ( - { - "aa": OPHYD_ASYNC_DEVICE_A, - "ab": OPHYD_ASYNC_DEVICE_B, - }, - {}, - { - "aa": OPHYD_ASYNC_DEVICE_A, - "ab": OPHYD_ASYNC_DEVICE_B, - }, - ) - yield ( - { - "oa": OPHYD_DEVICE_A, - "aa": OPHYD_ASYNC_DEVICE_A, - }, - {"oa": OPHYD_DEVICE_A}, - {"aa": OPHYD_ASYNC_DEVICE_A}, - ) - yield ( - { - "oa": OPHYD_DEVICE_A, - "aa": OPHYD_ASYNC_DEVICE_A, - "ob": OPHYD_DEVICE_B, - "ab": OPHYD_ASYNC_DEVICE_B, - }, - {"oa": OPHYD_DEVICE_A, "ob": OPHYD_DEVICE_B}, - { - "aa": OPHYD_ASYNC_DEVICE_A, - "ab": OPHYD_ASYNC_DEVICE_B, - }, - ) - - -@pytest.mark.parametrize( - "all_devices,expected_ophyd_devices,expected_ophyd_async_devices", - list(_filtering_test_cases()), -) -def test_filter_ophyd_devices_filters_ophyd_devices( - all_devices: Mapping[str, AnyDevice], - expected_ophyd_devices: Mapping[str, OphydV1Device], - expected_ophyd_async_devices: Mapping[str, OphydV2Device], -): - ophyd_devices, ophyd_async_devices = filter_ophyd_devices(all_devices) - assert ophyd_devices == expected_ophyd_devices - assert ophyd_async_devices == expected_ophyd_async_devices - - -def test_filter_ophyd_devices_raises_for_extra_types(): - with pytest.raises(ValueError): - ophyd_devices, ophyd_async_devices = filter_ophyd_devices( - { - "oa": OphydV1Device(prefix="", name="oa"), - "aa": OphydV2Device(name="aa"), - "ab": 3, # type: ignore - } - ) - - -@pytest.mark.parametrize( - "input, expected_result", - [ - [Readable, False], - [OphydV1Device, False], - [OphydV2Device, True], - [DiamondFilter[I03Filters], True], - [None, False], - [1, False], - ], -) -def test_is_v2_device_type(input: Any, expected_result: bool): - assert is_v2_device_type(input) == expected_result - - def test_get_beamline_name_raises_error_if_environment_variable_not_set_and_no_default_given( monkeypatch, ):