diff --git a/nslsii/__init__.py b/nslsii/__init__.py index fc4f100e..bd9b808b 100644 --- a/nslsii/__init__.py +++ b/nslsii/__init__.py @@ -452,7 +452,7 @@ def configure_ipython_logging( return bluesky_ipython_log_file_path -def configure_kafka_publisher(RE, beamline_name, override_config_path=None): +def configure_kafka_publisher(RE, beamline_name, override_config_path=None, **kwargs): """ Read a Kafka configuration file and subscribe a Kafka publisher to the RunEngine. A configuration file is required. Environment variable BLUESKY_KAFKA_CONFIG_FILE @@ -486,7 +486,8 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None): RE, beamline_name=beamline_name, bootstrap_servers=bootstrap_servers, - producer_config=bluesky_kafka_configuration["runengine_producer_config"] + producer_config=bluesky_kafka_configuration["runengine_producer_config"], + **kwargs ) else: kafka_publisher_details = _subscribe_kafka_queue_thread_publisher( @@ -496,6 +497,7 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None): producer_config=bluesky_kafka_configuration[ "runengine_producer_config" ], + *kwargs ) return bluesky_kafka_configuration, kafka_publisher_details @@ -680,25 +682,29 @@ def _read_bluesky_kafka_config_file(config_file_path): ) -def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None): +def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None, *, + document_source='runengine'): """ Subscribe a RunRouter to the specified RE to create Kafka Publishers. Each Publisher will publish documents from a single run to the - Kafka topic ".bluesky.runengine.documents". + Kafka topic ".bluesky..documents". + Parameters ---------- - RE: RunEngine + RE : RunEngine the RunEngine to which the RunRouter will be subscribed - beamline_name: str + beamline_name : str beamline start_name, for example "csx", to be used in building the Kafka topic to which messages will be published - bootstrap_servers: str + bootstrap_servers : str Comma-delimited list of Kafka server addresses as a string such as ``'10.0.137.8:9092'`` - producer_config: dict + producer_config : dict dictionary of Kafka Producer configuration settings - _publisher_factory: callable, optional + _publisher_factory : callable, optional intended only for testing, default is bluesky_kafka.Publisher, optionally specify a callable that constructs a Publisher-like object + document_source : str, optional + The document source. Returns ------- @@ -712,7 +718,7 @@ def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_co from bluesky_kafka.utils import list_topics from event_model import RunRouter - topic = f"{beamline_name.lower()}.bluesky.runengine.documents" + topic = f"{beamline_name.lower()}.bluesky.{document_source}.documents" if _publisher_factory is None: _publisher_factory = Publisher @@ -795,7 +801,7 @@ def publish_or_abort_run(name_, doc_): def _subscribe_kafka_queue_thread_publisher( - RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1 + RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1, document_source='runengine' ): """ Create and start a separate thread to publish bluesky documents as Kafka @@ -818,6 +824,8 @@ def _subscribe_kafka_queue_thread_publisher( such as ``'kafka1:9092,kafka2:9092`` producer_config: dict dictionary of Kafka Producer configuration settings + document_source : str, optional + The document source. Returns ------- @@ -830,20 +838,17 @@ def _subscribe_kafka_queue_thread_publisher( un-subscribe the function from the RunEngine, in case someone ever wants to do that. """ - from bluesky_kafka import BlueskyKafkaException from bluesky_kafka.tools.queue_thread import build_kafka_publisher_queue_and_thread nslsii_logger = logging.getLogger("nslsii") beamline_runengine_topic = None - kafka_publisher_token = None - publisher_thread_stop_event = None kafka_publisher_re_token = None publisher_queue_thread_details = None try: nslsii_logger.info("connecting to Kafka broker(s): '%s'", bootstrap_servers) beamline_runengine_topic = ( - f"{beamline_name.lower()}.bluesky.runengine.documents" + f"{beamline_name.lower()}.bluesky.{document_source}.documents" ) publisher_queue_thread_details = build_kafka_publisher_queue_and_thread( @@ -853,8 +858,6 @@ def _subscribe_kafka_queue_thread_publisher( publisher_queue_timeout=publisher_queue_timeout, ) - publisher_thread_stop_event = publisher_queue_thread_details.publisher_thread_stop_event - kafka_publisher_re_token = RE.subscribe( publisher_queue_thread_details.put_on_publisher_queue )