diff --git a/src/murfey/cli/inject_spa_processing.py b/src/murfey/cli/inject_spa_processing.py index 69f350c6a..9a55a2ef9 100644 --- a/src/murfey/cli/inject_spa_processing.py +++ b/src/murfey/cli/inject_spa_processing.py @@ -13,7 +13,6 @@ from murfey.util.config import get_machine_config, get_microscope, get_security_config from murfey.util.db import ( AutoProcProgram, - ClassificationFeedbackParameters, ClientEnvironment, DataCollection, DataCollectionGroup, @@ -136,15 +135,11 @@ def run(): .where(AutoProcProgram.pj_id == ProcessingJob.id) .where(ProcessingJob.recipe == "em-spa-preprocess") ).one() - params = murfey_db.exec( - select(SPARelionParameters, ClassificationFeedbackParameters) - .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) + proc_params = murfey_db.exec( + select(SPARelionParameters).where( + SPARelionParameters.pj_id == collected_ids[2].id + ) ).one() - proc_params: dict | None = dict(params[0]) - feedback_params = params[1] - if feedback_params.picker_murfey_id is None: - raise ValueError("No ISPyB picker ID was found") except sqlalchemy.exc.NoResultFound: proc_params = None @@ -196,7 +191,6 @@ def run(): "ft_bin": proc_params["motion_corr_binning"], "fm_dose": proc_params["dose_per_frame"], "gain_ref": proc_params["gain_ref"], - "picker_uuid": feedback_params.picker_murfey_id, "session_id": args.session_id, "particle_diameter": proc_params["particle_diameter"] or 0, "fm_int_file": args.eer_fractionation_file, diff --git a/src/murfey/server/api/auth.py b/src/murfey/server/api/auth.py index b0d5f292e..8c8988bf2 100644 --- a/src/murfey/server/api/auth.py +++ b/src/murfey/server/api/auth.py @@ -110,6 +110,8 @@ async def submit_to_auth_endpoint( Helper function to forward incoming requests to an authentication server to verify that they are allowed to inspect the """ + if security_config.auth_type == "none": + return {"valid": True} # Forward only essentials auth-related headers headers = { @@ -189,6 +191,9 @@ async def validate_instrument_token( """ Used by the backend routers to check the incoming instrument server token. """ + if security_config.instrument_auth_type == "none": + return None + try: # Validate using auth URL if provided if security_config.instrument_auth_url: diff --git a/src/murfey/server/api/session_info.py b/src/murfey/server/api/session_info.py index 64bde37fa..85acdb9bf 100644 --- a/src/murfey/server/api/session_info.py +++ b/src/murfey/server/api/session_info.py @@ -129,7 +129,7 @@ def all_visit_info( @router.get("/sessions/{session_id}/rsyncers", response_model=List[RsyncInstance]) -def get_rsyncers_for_client(session_id: MurfeySessionID, db=murfey_db): +def get_rsyncers_for_client(session_id: MurfeySessionID, db: Session = murfey_db): rsync_instances = db.exec( select(RsyncInstance).where(RsyncInstance.session_id == session_id) ) @@ -142,7 +142,9 @@ class SessionClients(BaseModel): @router.get("/sessions/{session_id}") -async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClients: +async def get_session( + session_id: MurfeySessionID, db: Session = murfey_db +) -> SessionClients: session = db.exec(select(Session).where(Session.id == session_id)).one() clients = db.exec( select(ClientEnvironment).where(ClientEnvironment.session_id == session_id) @@ -151,7 +153,7 @@ async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClien @router.get("/sessions") -async def get_sessions(db=murfey_db): +async def get_sessions(db: Session = murfey_db): sessions = db.exec(select(Session)).all() clients = db.exec(select(ClientEnvironment)).all() res = [] @@ -174,7 +176,7 @@ def create_session( visit: str, name: str, visit_end_time: VisitEndTime, - db=murfey_db, + db: Session = murfey_db, ) -> int: s = Session( name=name, @@ -190,7 +192,7 @@ def create_session( @router.post("/sessions/{session_id}") def update_session( - session_id: MurfeySessionID, process: bool = True, db=murfey_db + session_id: MurfeySessionID, process: bool = True, db: Session = murfey_db ) -> None: session = db.exec(select(Session).where(Session.id == session_id)).one() session.process = process @@ -200,13 +202,13 @@ def update_session( @router.delete("/sessions/{session_id}") -def remove_session(session_id: MurfeySessionID, db=murfey_db): +def remove_session(session_id: MurfeySessionID, db: Session = murfey_db): remove_session_by_id(session_id, db) @router.get("/instruments/{instrument_name}/visits/{visit_name}/sessions") def get_sessions_with_visit( - instrument_name: MurfeyInstrumentName, visit_name: str, db=murfey_db + instrument_name: MurfeyInstrumentName, visit_name: str, db: Session = murfey_db ) -> List[Session]: sessions = db.exec( select(Session) @@ -218,7 +220,7 @@ def get_sessions_with_visit( @router.get("/instruments/{instrument_name}/sessions") async def get_sessions_by_instrument_name( - instrument_name: MurfeyInstrumentName, db=murfey_db + instrument_name: MurfeyInstrumentName, db: Session = murfey_db ) -> List[Session]: sessions = db.exec( select(Session).where(Session.instrument_name == instrument_name) @@ -228,7 +230,7 @@ async def get_sessions_by_instrument_name( @router.get("/sessions/{session_id}/data_collection_groups") def get_dc_groups( - session_id: MurfeySessionID, db=murfey_db + session_id: MurfeySessionID, db: Session = murfey_db ) -> Dict[str, DataCollectionGroup]: data_collection_groups = db.exec( select(DataCollectionGroup).where(DataCollectionGroup.session_id == session_id) @@ -238,7 +240,7 @@ def get_dc_groups( @router.get("/sessions/{session_id}/data_collection_groups/{dcgid}/data_collections") def get_data_collections( - session_id: MurfeySessionID, dcgid: int, db=murfey_db + session_id: MurfeySessionID, dcgid: int, db: Session = murfey_db ) -> List[DataCollection]: data_collections = db.exec( select(DataCollection).where(DataCollection.dcg_id == dcgid) @@ -247,7 +249,7 @@ def get_data_collections( @router.get("/clients") -async def get_clients(db=murfey_db): +async def get_clients(db: Session = murfey_db): clients = db.exec(select(ClientEnvironment)).all() return clients @@ -258,7 +260,7 @@ class CurrentGainRef(BaseModel): @router.put("/sessions/{session_id}/current_gain_ref") def update_current_gain_ref( - session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db + session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db: Session = murfey_db ): session = db.exec(select(Session).where(Session.id == session_id)).one() session.current_gain_ref = new_gain_ref.path @@ -293,7 +295,7 @@ class ProcessingDetails(BaseModel): @spa_router.get("/sessions/{session_id}/spa_processing_parameters") def get_spa_proc_param_details( - session_id: MurfeySessionID, db=murfey_db + session_id: MurfeySessionID, db: Session = murfey_db ) -> Optional[List[ProcessingDetails]]: params = db.exec( select( @@ -342,7 +344,7 @@ def _parse(ps, i, dcg_id): "/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares/{gsid}/foil_holes/{fhid}/num_movies" ) def get_number_of_movies_from_foil_hole( - session_id: int, dcgid: int, gsid: int, fhid: int, db=murfey_db + session_id: int, dcgid: int, gsid: int, fhid: int, db: Session = murfey_db ) -> int: movies = db.exec( select(Movie, FoilHole, GridSquare, DataCollectionGroup) @@ -358,13 +360,13 @@ def get_number_of_movies_from_foil_hole( @spa_router.get("/sessions/{session_id}/grid_squares") -def get_grid_squares(session_id: MurfeySessionID, db=murfey_db): +def get_grid_squares(session_id: MurfeySessionID, db: Session = murfey_db): return _get_grid_squares(session_id, db) @spa_router.get("/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares") def get_grid_squares_from_dcg( - session_id: MurfeySessionID, dcgid: int, db=murfey_db + session_id: MurfeySessionID, dcgid: int, db: Session = murfey_db ) -> List[GridSquare]: return _get_grid_squares_from_dcg(session_id, dcgid, db) @@ -373,14 +375,14 @@ def get_grid_squares_from_dcg( "/sessions/{session_id}/data_collection_groups/{dcgid}/grid_squares/{gsid}/foil_holes" ) def get_foil_holes_from_grid_square( - session_id: MurfeySessionID, dcgid: int, gsid: int, db=murfey_db + session_id: MurfeySessionID, dcgid: int, gsid: int, db: Session = murfey_db ) -> List[FoilHole]: return _get_foil_holes_from_grid_square(session_id, dcgid, gsid, db) @spa_router.get("/sessions/{session_id}/foil_hole/{fh_name}") def get_foil_hole( - session_id: MurfeySessionID, fh_name: int, db=murfey_db + session_id: MurfeySessionID, fh_name: int, db: Session = murfey_db ) -> Dict[str, int]: return _get_foil_hole(session_id, fh_name, db) @@ -394,7 +396,7 @@ def get_foil_hole( @tomo_router.get("/sessions/{session_id}/tilt_series/{tilt_series_tag}/tilts") def get_tilts( - session_id: MurfeySessionID, tilt_series_tag: str, db=murfey_db + session_id: MurfeySessionID, tilt_series_tag: str, db: Session = murfey_db ) -> Dict[str, List[str]]: res = db.exec( select(TiltSeries, Tilt) @@ -419,7 +421,7 @@ def get_tilts( @correlative_router.get("/sessions/{session_id}/upstream_visits") -async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db): +async def find_upstream_visits(session_id: MurfeySessionID, db: Session = murfey_db): return _find_upstream_visits(session_id=session_id, db=db) @@ -430,7 +432,7 @@ async def gather_upstream_files( visit_name: str, session_id: MurfeySessionID, upstream_file_request: UpstreamFileRequestInfo, - db=murfey_db, + db: Session = murfey_db, ): return _gather_upstream_files( session_id=session_id, @@ -447,7 +449,7 @@ async def get_upstream_file( visit_name: str, session_id: MurfeySessionID, upstream_file_path: Path, - db=murfey_db, + db: Session = murfey_db, ): upstream_file = _get_upstream_file(upstream_file_path) return ( @@ -458,14 +460,18 @@ async def get_upstream_file( @correlative_router.get( "/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths" ) -async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db): +async def gather_upstream_tiffs( + visit_name: str, session_id: int, db: Session = murfey_db +): return _gather_upstream_tiffs(visit_name=visit_name, session_id=session_id, db=db) @correlative_router.get( "/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}" ) -async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=murfey_db): +async def get_tiff_file( + visit_name: str, session_id: int, tiff_path: str, db: Session = murfey_db +): tiff_file = _get_tiff_file( visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db ) diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index 4d27fe9b5..f686a9361 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -16,7 +16,7 @@ BLSubSample, ) from pydantic import BaseModel -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import NoResultFound, OperationalError from sqlmodel import col, select from werkzeug.utils import secure_filename @@ -25,6 +25,8 @@ except ImportError: Image = None +import murfey.server + try: from smartem_backend.api_client import SmartEMAPIClient from smartem_common.schemas import ( @@ -39,7 +41,6 @@ SMARTEM_ACTIVE = False import murfey.server.prometheus as prom -from murfey.server import _transport_object from murfey.server.api.auth import ( MurfeySessionIDInstrument as MurfeySessionID, validate_instrument_token, @@ -58,7 +59,6 @@ from murfey.util.config import get_machine_config from murfey.util.db import ( AutoProcProgram, - ClassificationFeedbackParameters, DataCollection, DataCollectionGroup, FoilHole, @@ -118,7 +118,7 @@ def register_dc_group( visit_name: str, session_id: MurfeySessionID, dcg_params: DCGroupParameters, - db=murfey_db, + db: Session = murfey_db, ): ispyb_proposal_code = visit_name[:2] ispyb_proposal_number = visit_name.split("-")[0][2:] @@ -184,10 +184,10 @@ def register_dc_group( if smartem_grid_uuid: dcg_instance.smartem_grid_uuid = smartem_grid_uuid - if _transport_object: + if murfey.server._transport_object: if dcg_instance.atlas_id is not None: - _transport_object.send( - _transport_object.feedback_queue, + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, { "register": "atlas_update", "tag": dcg_instance.tag, @@ -200,7 +200,7 @@ def register_dc_group( }, ) else: - atlas_id_response = _transport_object.do_insert_atlas( + atlas_id_response = murfey.server._transport_object.do_insert_atlas( Atlas( dataCollectionGroupId=dcg_instance.id, atlasImage=dcg_params.atlas, @@ -234,9 +234,9 @@ def register_dc_group( # Case where we switch from atlas to processing original_tag = dcg_murfey[0].tag dcg_murfey[0].tag = dcg_params.tag or dcg_murfey[0].tag - if _transport_object: - _transport_object.send( - _transport_object.feedback_queue, + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, { "register": "experiment_type_update", "experiment_type_id": dcg_params.experiment_type_id, @@ -263,9 +263,9 @@ def register_dc_group( "atlas_pixel_size": dcg_params.atlas_pixel_size, } - if _transport_object: - _transport_object.send( - _transport_object.feedback_queue, + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, { "register": "data_collection_group", **dcg_parameters, @@ -305,7 +305,10 @@ class DCParameters(BaseModel): @router.post("/visits/{visit_name}/sessions/{session_id}/start_data_collection") def start_dc( - visit_name: str, session_id: MurfeySessionID, dc_params: DCParameters, db=murfey_db + visit_name: str, + session_id: MurfeySessionID, + dc_params: DCParameters, + db: Session = murfey_db, ): ispyb_proposal_code = visit_name[:2] ispyb_proposal_number = visit_name.split("-")[0][2:] @@ -343,9 +346,9 @@ def start_dc( "session_id": session_id, } - if _transport_object: - _transport_object.send( - _transport_object.feedback_queue, + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, { "register": "data_collection", **dc_parameters, @@ -373,7 +376,7 @@ def register_proc( visit_name: str, session_id: MurfeySessionID, proc_params: ProcessingJobParameters, - db=murfey_db, + db: Session = murfey_db, ): proc_parameters: dict = { "session_id": session_id, @@ -406,9 +409,9 @@ def register_proc( ) proc_parameters["job_parameters"] = job_parameters - if _transport_object: - _transport_object.send( - _transport_object.feedback_queue, + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, {"register": "processing_job", **proc_parameters}, ) return proc_params @@ -423,7 +426,9 @@ def register_proc( @spa_router.post("/sessions/{session_id}/spa_processing_parameters") def register_spa_proc_params( - session_id: MurfeySessionID, proc_params: ProcessingParametersSPA, db=murfey_db + session_id: MurfeySessionID, + proc_params: ProcessingParametersSPA, + db: Session = murfey_db, ): session_processing_parameters = db.exec( select(SessionProcessingParameters).where( @@ -443,8 +448,16 @@ def register_spa_proc_params( **dict(proc_params), "session_id": session_id, } - if _transport_object: - _transport_object.send(_transport_object.feedback_queue, zocalo_message) + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, zocalo_message + ) + else: + logger.error( + f"Pre-processing was requested for {session_id} " + "but no Zocalo transport object was found" + ) + return proc_params class Tag(BaseModel): @@ -453,15 +466,17 @@ class Tag(BaseModel): @spa_router.post("/visits/{visit_name}/sessions/{session_id}/flush_spa_processing") def flush_spa_processing( - visit_name: str, session_id: MurfeySessionID, tag: Tag, db=murfey_db + visit_name: str, session_id: MurfeySessionID, tag: Tag, db: Session = murfey_db ): zocalo_message = { "register": "spa.flush_spa_preprocess", "session_id": session_id, "tag": tag.tag, } - if _transport_object: - _transport_object.send(_transport_object.feedback_queue, zocalo_message) + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, zocalo_message + ) return @@ -488,7 +503,7 @@ async def request_spa_preprocessing( visit_name: str, session_id: MurfeySessionID, proc_file: SPAProcessFile, - db=murfey_db, + db: Session = murfey_db, ): instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name @@ -507,13 +522,11 @@ async def request_spa_preprocessing( .where(AutoProcProgram.pj_id == ProcessingJob.id) .where(ProcessingJob.recipe == "em-spa-preprocess") ).one() - params = db.exec( - select(SPARelionParameters, ClassificationFeedbackParameters) - .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) + proc_params = db.exec( + select(SPARelionParameters).where( + SPARelionParameters.pj_id == collected_ids[2].id + ) ).one() - proc_params: Optional[dict] = dict(params[0]) - feedback_params = params[1] except sqlalchemy.exc.NoResultFound: proc_params = None try: @@ -528,6 +541,9 @@ async def request_spa_preprocessing( .one()[0] .id ) + except NoResultFound: + logger.warning("No foil hole ID found") + foil_hole_id = None except Exception as e: logger.warning( f"Foil hole ID not found for foil hole {sanitise(str(proc_file.foil_hole_id))}: {e}", @@ -538,10 +554,6 @@ async def request_spa_preprocessing( detached_ids = [c.id for c in collected_ids] murfey_ids = _murfey_id(detached_ids[3], db, number=2, close=False) - - if feedback_params.picker_murfey_id is None: - feedback_params.picker_murfey_id = murfey_ids[1] - db.add(feedback_params) movie = Movie( murfey_id=murfey_ids[0], data_collection_id=detached_ids[1], @@ -644,12 +656,11 @@ async def request_spa_preprocessing( ), }, } - # log.info(f"Sending Zocalo message {zocalo_message}") - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue ) - _transport_object.send("processing_recipe", zocalo_message) + murfey.server._transport_object.send("processing_recipe", zocalo_message) else: logger.error( f"Pre-processing was requested for {sanitise(Path(proc_file.path).name)} " @@ -683,7 +694,9 @@ async def request_spa_preprocessing( @tomo_router.post("/sessions/{session_id}/tomography_processing_parameters") def register_tomo_proc_params( - session_id: MurfeySessionID, proc_params: ProcessingParametersTomo, db=murfey_db + session_id: MurfeySessionID, + proc_params: ProcessingParametersTomo, + db: Session = murfey_db, ): session_processing_parameters = db.exec( select(SessionProcessingParameters).where( @@ -702,8 +715,10 @@ def register_tomo_proc_params( **dict(proc_params), "session_id": session_id, } - if _transport_object: - _transport_object.send(_transport_object.feedback_queue, zocalo_message) + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, zocalo_message + ) class Source(BaseModel): @@ -714,7 +729,10 @@ class Source(BaseModel): "/visits/{visit_name}/sessions/{session_id}/flush_tomography_processing" ) def flush_tomography_processing( - visit_name: str, session_id: MurfeySessionID, rsync_source: Source, db=murfey_db + visit_name: str, + session_id: MurfeySessionID, + rsync_source: Source, + db: Session = murfey_db, ): zocalo_message = { "register": "flush_tomography_preprocess", @@ -722,8 +740,10 @@ def flush_tomography_processing( "visit_name": visit_name, "data_collection_group_tag": rsync_source.rsync_source, } - if _transport_object: - _transport_object.send(_transport_object.feedback_queue, zocalo_message) + if murfey.server._transport_object: + murfey.server._transport_object.send( + murfey.server._transport_object.feedback_queue, zocalo_message + ) return @@ -735,7 +755,7 @@ class TiltSeriesInfo(BaseModel): @tomo_router.post("/visits/{visit_name}/tilt_series") def register_tilt_series( - visit_name: str, tilt_series_info: TiltSeriesInfo, db=murfey_db + visit_name: str, tilt_series_info: TiltSeriesInfo, db: Session = murfey_db ): session_id = tilt_series_info.session_id if db.exec( @@ -764,7 +784,7 @@ class TiltSeriesGroupInfo(BaseModel): def register_tilt_series_length( session_id: int, tilt_series_group: TiltSeriesGroupInfo, - db=murfey_db, + db: Session = murfey_db, ): tilt_series_db = db.exec( select(TiltSeries) @@ -802,7 +822,7 @@ async def request_tomography_preprocessing( visit_name: str, session_id: MurfeySessionID, proc_file: TomoProcessFile, - db=murfey_db, + db: Session = murfey_db, ): instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name @@ -888,11 +908,11 @@ async def request_tomography_preprocessing( "fm_int_file": proc_file.eer_fractionation_file, }, } - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue ) - _transport_object.send("processing_recipe", zocalo_message) + murfey.server._transport_object.send("processing_recipe", zocalo_message) else: logger.error( f"Pre-processing was requested for {sanitise(Path(proc_file.path).name)} " @@ -919,7 +939,7 @@ def register_completed_tilt_series( visit_name: str, session_id: MurfeySessionID, tilt_series_group: TiltSeriesGroupInfo, - db=murfey_db, + db: Session = murfey_db, ): tilt_series_db = db.exec( select(TiltSeries) @@ -1009,9 +1029,9 @@ def register_completed_tilt_series( "y_location": ts.y_location, }, } - if _transport_object: + if murfey.server._transport_object: logger.info(f"Sending Zocalo message for processing: {zocalo_message}") - _transport_object.send( + murfey.server._transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) else: @@ -1023,7 +1043,7 @@ def register_completed_tilt_series( @tomo_router.post("/visits/{visit_name}/rerun_tilt_series") def register_tilt_series_for_rerun( - visit_name: str, tilt_series_info: TiltSeriesInfo, db=murfey_db + visit_name: str, tilt_series_info: TiltSeriesInfo, db: Session = murfey_db ): """Set processing to false for cases where an extra tilt is found for a series""" session_id = tilt_series_info.session_id @@ -1048,7 +1068,10 @@ class TiltInfo(BaseModel): @tomo_router.post("/visits/{visit_name}/sessions/{session_id}/tilt") async def register_tilt( - visit_name: str, session_id: MurfeySessionID, tilt_info: TiltInfo, db=murfey_db + visit_name: str, + session_id: MurfeySessionID, + tilt_info: TiltInfo, + db: Session = murfey_db, ): def _add_tilt(): tilt_series_id = ( @@ -1141,8 +1164,8 @@ def get_samples(visit_name: str, db=ispyb_db) -> List[Sample]: def register_sample_group(visit_name: str, db=ispyb_db) -> dict: proposal_id = get_proposal_id(visit_name[:2], visit_name.split("-")[0][2:], db=db) record = BLSampleGroup(proposalId=proposal_id) - if _transport_object: - return _transport_object.do_insert_sample_group(record) + if murfey.server._transport_object: + return murfey.server._transport_object.do_insert_sample_group(record) return {"success": False} @@ -1153,8 +1176,10 @@ class BLSampleParameters(BaseModel): @correlative_router.post("/visit/{visit_name}/sample") def register_sample(visit_name: str, sample_params: BLSampleParameters) -> dict: record = BLSample() - if _transport_object: - return _transport_object.do_insert_sample(record, sample_params.sample_group_id) + if murfey.server._transport_object: + return murfey.server._transport_object.do_insert_sample( + record, sample_params.sample_group_id + ) return {"success": False} @@ -1170,8 +1195,8 @@ def register_subsample( record = BLSubSample( blSampleId=subsample_params.sample_id, imgFilePath=subsample_params.image_path ) - if _transport_object: - return _transport_object.do_insert_subsample(record) + if murfey.server._transport_object: + return murfey.server._transport_object.do_insert_subsample(record) return {"success": False} @@ -1188,8 +1213,8 @@ def register_sample_image( blSampleId=sample_image_params.sample_id, imageFullPath=sample_image_params.image_path, ) - if _transport_object: - return _transport_object.do_insert_sample_image(record) + if murfey.server._transport_object: + return murfey.server._transport_object.do_insert_sample_image(record) return {"success": False} @@ -1207,7 +1232,7 @@ async def make_gif( visit_name: str, session_id: int, gif_params: MillingParameters, - db=murfey_db, + db: Session = murfey_db, ): instrument_name = ( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 1fc0c635f..316597406 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -22,6 +22,8 @@ import mrcfile import numpy as np +from gemmi import cif +from pipeliner.star_keys import GENERAL_BLOCK, JOB_COUNTER from sqlalchemy import func from sqlalchemy.exc import ( InvalidRequestError, @@ -50,6 +52,40 @@ logger = logging.getLogger("murfey.server.feedback") +def _current_pipeline_job_counter(visit_name: str) -> int: + """Return the next jobNNN Pipeliner will allocate for visit_name. + + Reads the JOB_COUNTER value from default_pipeline.star so that + SPA feedback decisions are anchored to Pipeliner's actual state instead + of an independent integer counter that drifts. + + Falls back to 7 (previous default) if the file is + missing — this preserves the previous behaviour for non Doppio runs + """ + pipeline_file = Path(visit_name) / "default_pipeline.star" + if not pipeline_file.is_file(): + return 7 + try: + dp = cif.read_file(str(pipeline_file)) + block = dp.find_block(GENERAL_BLOCK) + if block is None: + return 7 + return int(block.find_value(JOB_COUNTER)) + except Exception: + logger.warning( + "Failed to read JOB_COUNTER from %s — falling back to legacy job number", + pipeline_file, + exc_info=True, + ) + return 7 + + +def _visit_name_for_session(session_id: int, _db) -> str: + """Return the visit (project directory) for a Murfey session id.""" + session_row = _db.exec(select(db.Session).where(db.Session.id == session_id)).one() + return session_row.visit + + try: _url = url(get_security_config()) engine = create_engine(_url) @@ -373,9 +409,8 @@ def _release_2d_hold(message: dict, _db): "recipes": ["em-spa-class2d"], } if first_class2d.complete: - feedback_params.next_job += ( - 4 if default_spa_parameters.do_icebreaker_jobs else 3 - ) + visit_name = _visit_name_for_session(message["session_id"], _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) feedback_params.rerun_class2d = False _db.add(feedback_params) if first_class2d.complete: @@ -583,7 +618,9 @@ def _register_incomplete_2d_batch(message: dict, _db): _db.commit() _db.close() return - feedback_params.next_job = 10 if default_spa_parameters.do_icebreaker_jobs else 7 + # Get next_job from the actual Pipeliner counter + visit_name = _visit_name_for_session(message["session_id"], _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) feedback_params.hold_class2d = True relion_options = dict(relion_params) other_options = dict(feedback_params) @@ -725,15 +762,8 @@ def _register_complete_2d_batch(message: dict, _db): murfey_ids, class2d_message["particles_file"], _app_id(pj_id, _db), _db ) elif not feedback_params.class_selection_score: - # For the first batch, start a container and set the database to wait - job_number_after_first_batch = ( - 10 if default_spa_parameters.do_icebreaker_jobs else 7 - ) - if ( - feedback_params.next_job is not None - and feedback_params.next_job < job_number_after_first_batch - ): - feedback_params.next_job = job_number_after_first_batch + visit_name = _visit_name_for_session(message["session_id"], _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) if not feedback_params.star_combination_job: feedback_params.star_combination_job = feedback_params.next_job + ( 3 if default_spa_parameters.do_icebreaker_jobs else 2 @@ -804,14 +834,14 @@ def _register_complete_2d_batch(message: dict, _db): "processing_recipe", zocalo_message, new_connection=True ) feedback_params.hold_class2d = True - feedback_params.next_job += ( - 4 if default_spa_parameters.do_icebreaker_jobs else 3 - ) + # next_job is re-anchored from Pipeliner on the next entry to this + # function — no manual increment needed. _db.add(feedback_params) _db.commit() _db.close() else: - # Send all other messages on to a container + visit_name = _visit_name_for_session(message["session_id"], _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) if _db.exec( select(func.count(db.Class2DParameters.particles_file)) .where(db.Class2DParameters.pj_id == pj_id) @@ -877,9 +907,7 @@ def _register_complete_2d_batch(message: dict, _db): murfey.server._transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) - feedback_params.next_job += ( - 3 if default_spa_parameters.do_icebreaker_jobs else 2 - ) + feedback_params.hold_class2d = False _db.add(feedback_params) _db.commit() _db.close() @@ -924,10 +952,9 @@ def _flush_class2d( .where(db.Class2DParameters.pj_id == pj_id) .where(db.Class2DParameters.complete) ).all() - if not feedback_params.next_job: - feedback_params.next_job = ( - 10 if default_spa_parameters.do_icebreaker_jobs else 7 - ) + # Check pipeliner counter + visit_name = _visit_name_for_session(session_id, _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) if not feedback_params.star_combination_job: feedback_params.star_combination_job = feedback_params.next_job + ( 3 if default_spa_parameters.do_icebreaker_jobs else 2 @@ -1173,6 +1200,10 @@ def _register_3d_batch(message: dict, _db): .visit ) + # Check Pipeliner's job counter + feedback_params.next_job = _current_pipeline_job_counter(visit_name) + other_options["next_job"] = feedback_params.next_job + provided_initial_model = _find_initial_model(visit_name, machine_config) if provided_initial_model and not feedback_params.initial_model: rescaled_initial_model_path = ( @@ -1195,7 +1226,6 @@ def _register_3d_batch(message: dict, _db): class3d_dir = ( f"{class3d_message['class3d_dir']}{(feedback_params.next_job + 1):03}" ) - feedback_params.next_job += 1 _db.add(feedback_params) _db.commit() @@ -1230,7 +1260,6 @@ def _register_3d_batch(message: dict, _db): _db.close() elif not feedback_params.initial_model: # For the first batch, start a container and set the database to wait - next_job = feedback_params.next_job class3d_dir = ( f"{class3d_message['class3d_dir']}{(feedback_params.next_job + 1):03}" ) @@ -1250,8 +1279,6 @@ def _register_3d_batch(message: dict, _db): ) feedback_params.hold_class3d = True - next_job += 2 - feedback_params.next_job = next_job zocalo_message: dict = { "parameters": { "particles_file": class3d_message["particles_file"], @@ -1510,6 +1537,11 @@ def _register_refinement(message: dict, _db): ) ).one() + # Re-anchor next_job to Pipeliner's actual counter so the predicted + # Refine3D / MaskCreate / PostProcess slots line up with reality. + visit_name = _visit_name_for_session(message["session_id"], _db) + feedback_params.next_job = _current_pipeline_job_counter(visit_name) + if feedback_params.hold_refine: # If waiting then save the message refine_params = _db.exec( @@ -1538,7 +1570,6 @@ def _register_refinement(message: dict, _db): .where(db.RefineParameters.tag == "symmetry") ).one() except SQLAlchemyError: - next_job = feedback_params.next_job refine_dir = f"{message['refine_dir']}{(feedback_params.next_job + 2):03}" refined_grp_uuid = _murfey_id(message["program_id"], _db)[0] refined_class_uuid = _murfey_id(message["program_id"], _db)[0] @@ -1579,14 +1610,6 @@ def _register_refinement(message: dict, _db): _db=_db, ) - if relion_options["symmetry"] == "C1": - # Extra Refine, Mask, PostProcess beyond for determined symmetry - next_job += 8 - else: - # Select and Extract particles, then Refine, Mask, PostProcess - next_job += 5 - feedback_params.next_job = next_job - zocalo_message: dict = { "parameters": { "refine_job_dir": refine_params.refine_dir, diff --git a/src/murfey/server/ispyb.py b/src/murfey/server/ispyb.py index 815f06354..5349e44d0 100644 --- a/src/murfey/server/ispyb.py +++ b/src/murfey/server/ispyb.py @@ -41,18 +41,22 @@ log = logging.getLogger("murfey.server.ispyb") security_config = get_security_config() -try: - ISPyBSession = sessionmaker( - bind=create_engine( - url(credentials=security_config.ispyb_credentials), - connect_args={"use_pure": True}, - pool_recycle=250, +if security_config.ispyb_credentials: + try: + ISPyBSession = sessionmaker( + bind=create_engine( + url(credentials=security_config.ispyb_credentials), + connect_args={"use_pure": True}, + pool_recycle=250, + ) ) - ) - log.info("Loaded ISPyB database session") -# Catch all errors associated with loading ISPyB database -except Exception: - log.error("Error loading ISPyB session", exc_info=True) + log.info("Loaded ISPyB database session") + # Catch all errors associated with loading ISPyB database + except Exception: + log.error("Error loading ISPyB session", exc_info=True) + ISPyBSession = lambda: None +else: + log.info("No ISPyB credentials set, using local database") ISPyBSession = lambda: None diff --git a/src/murfey/util/config.py b/src/murfey/util/config.py index 5e50d7c22..060ec65bf 100644 --- a/src/murfey/util/config.py +++ b/src/murfey/util/config.py @@ -237,7 +237,7 @@ class Security(BaseModel): # Murfey server connection settings auth_url: str = "" - auth_type: Literal["password", "cookie"] = "password" + auth_type: Literal["password", "cookie", "none"] = "password" auth_algorithm: str = "" auth_key: str = "" cookie_key: str = "" diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index 5ba6691d6..5a4358445 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -704,7 +704,6 @@ class ClassificationFeedbackParameters(SQLModel, table=True): # type: ignore star_combination_job: int initial_model: str next_job: int - picker_murfey_id: Optional[int] = Field(default=None, foreign_key="murfeyledger.id") processing_job: Optional[ProcessingJob] = Relationship( back_populates="classification_feedback_parameters" ) diff --git a/src/murfey/util/processing_params.py b/src/murfey/util/processing_params.py index 1ddb2b251..dbb660749 100644 --- a/src/murfey/util/processing_params.py +++ b/src/murfey/util/processing_params.py @@ -1,35 +1,94 @@ +import logging +import os from datetime import datetime from functools import lru_cache from pathlib import Path +from pipeliner.project_graph import ProjectGraph from pydantic import BaseModel from werkzeug.utils import secure_filename from murfey.util.config import MachineConfig, get_machine_config +logger = logging.getLogger("murfey.util.processing_params") + + +_DEFAULT_MOTIONCORR_FALLBACK = "job002" + + +@lru_cache(maxsize=16) +def _job_dir_for_alias_cached(visit_name: str, alias: str, mtime_ns: int) -> str | None: + """Read default_pipeline.star and return the jobNNN for the given alias. + + Returns None on any failure (missing file, graph read error, alias + not found). The mtime_ns argument is a cache key — when Pipeliner rewrites + default_pipeline.star its mtime changes and the next call falls through + to a fresh read. + """ + project_dir = Path(visit_name) + pipeline_file = project_dir / "default_pipeline.star" + if not pipeline_file.is_file(): + return None + try: + with ProjectGraph(pipeline_dir=project_dir, read_only=True) as graph: + for proc in graph.process_list: + proc_alias = getattr(proc, "alias", None) + if proc_alias and proc_alias.rstrip("/").endswith(alias): + # proc.name is e.g. "MotionCorr/job003/" + return Path(proc.name).name + except Exception: + logger.error( + "ProjectGraph read failed while looking up alias %r in %s", + alias, + pipeline_file, + exc_info=True, + ) + return None + return None + + +def _job_dir_for_alias(visit_name: str, alias: str) -> str: + """Return the Pipeliner jobNNN for alias in the given project. + + visit_name must be an path to the project directory. + Falls back to the positional default job002 and logs a warning so + drift from the live pipeline is visible in the logs instead of silent. + """ + project_dir = Path(visit_name).resolve() + pipeline_file = project_dir / "default_pipeline.star" + try: + mtime_ns = pipeline_file.stat().st_mtime_ns + except FileNotFoundError: + logger.warning( + "default_pipeline.star missing at %s — falling back to %s for alias %r", + pipeline_file, + _DEFAULT_MOTIONCORR_FALLBACK, + alias, + ) + return _DEFAULT_MOTIONCORR_FALLBACK + job_dir = _job_dir_for_alias_cached(str(project_dir), alias, mtime_ns) + if job_dir is None: + logger.warning( + "Alias %r not found in %s — falling back to %s", + alias, + pipeline_file, + _DEFAULT_MOTIONCORR_FALLBACK, + ) + return _DEFAULT_MOTIONCORR_FALLBACK + return job_dir + def motion_corrected_mrc( input_movie: Path, visit_name: str, machine_config: MachineConfig ): - parts = [secure_filename(p) for p in input_movie.parts] - visit_idx = parts.index(visit_name) - core = Path("/") / Path(*parts[: visit_idx + 1]) - ppath = Path("/") / Path(*parts) - if machine_config.process_multiple_datasets: - sub_dataset = ppath.relative_to(core).parts[0] - else: - sub_dataset = "" - extra_path = machine_config.processed_extra_directory + movie = os.path.basename(input_movie) + job_dir = _job_dir_for_alias(visit_name, "Live_motioncorr") mrc_out = ( - core - / machine_config.processed_directory_name - / sub_dataset - / extra_path + Path(visit_name) / "MotionCorr" - / "job002" + / job_dir / "Movies" - / ppath.parent.relative_to(core / sub_dataset) - / str(ppath.stem + "_motion_corrected.mrc") + / str(movie + "_motion_corrected.mrc") ) return Path("/".join(secure_filename(p) for p in mrc_out.parts)) diff --git a/src/murfey/workflows/register_data_collection.py b/src/murfey/workflows/register_data_collection.py index bedaf63d5..3a83ecf03 100644 --- a/src/murfey/workflows/register_data_collection.py +++ b/src/murfey/workflows/register_data_collection.py @@ -6,7 +6,7 @@ from sqlmodel.orm.session import Session as SQLModelSession import murfey.util.db as MurfeyDB -from murfey.server import _transport_object +import murfey.server from murfey.server.ispyb import ISPyBSession, get_session_id from murfey.util import sanitise @@ -15,7 +15,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: # Fail immediately if transport manager was not provided - if _transport_object is None: + if murfey.server._transport_object is None: logger.error("Unable to find transport manager") return {"success": False, "requeue": False} @@ -76,7 +76,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: c2aperture=message.get("c2aperture"), phasePlate=int(message.get("phase_plate", 0)), ) - dcid = _transport_object.do_insert_data_collection( + dcid = murfey.server._transport_object.do_insert_data_collection( record, tag=( message.get("tag") diff --git a/src/murfey/workflows/register_data_collection_group.py b/src/murfey/workflows/register_data_collection_group.py index 0908e769b..dae2111a8 100644 --- a/src/murfey/workflows/register_data_collection_group.py +++ b/src/murfey/workflows/register_data_collection_group.py @@ -7,7 +7,7 @@ from sqlmodel import select from sqlmodel.orm.session import Session as SQLModelSession -from murfey.server import _transport_object +import murfey.server from murfey.server.ispyb import ISPyBSession, get_session_id from murfey.util.db import DataCollectionGroup @@ -16,7 +16,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: # Fail immediately if no transport wrapper is found - if _transport_object is None: + if murfey.server._transport_object is None: logger.error("Unable to find transport manager") return {"success": False, "requeue": False} @@ -50,9 +50,9 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: experimentTypeId=message["experiment_type_id"], ) - dcgid = _transport_object.do_insert_data_collection_group(record).get( - "return_value", None - ) + dcgid = murfey.server._transport_object.do_insert_data_collection_group( + record + ).get("return_value", None) if dcgid is None: time.sleep(2) @@ -75,9 +75,9 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: if color_flags := message.get("color_flags", {}): for col_name, value in color_flags.items(): setattr(atlas_record, col_name, value) - atlas_id = _transport_object.do_insert_atlas(atlas_record).get( - "return_value", None - ) + atlas_id = murfey.server._transport_object.do_insert_atlas( + atlas_record + ).get("return_value", None) murfey_dcg = DataCollectionGroup( id=dcgid, diff --git a/src/murfey/workflows/register_processing_job.py b/src/murfey/workflows/register_processing_job.py index 1bb2d5f52..b6ab7b5a5 100644 --- a/src/murfey/workflows/register_processing_job.py +++ b/src/murfey/workflows/register_processing_job.py @@ -7,7 +7,7 @@ import murfey.server.prometheus as prom import murfey.util.db as MurfeyDB -from murfey.server import _transport_object +import murfey.server from murfey.server.ispyb import ISPyBSession from murfey.util import sanitise @@ -16,7 +16,7 @@ def run(message: dict, murfey_db: SQLModelSession): # Faill immediately if not transport manager is set - if _transport_object is None: + if murfey.server._transport_object is None: logger.error("Unable to find transport manager") return {"success": False, "requeue": False} @@ -56,11 +56,11 @@ def run(message: dict, murfey_db: SQLModelSession): ISPyBDB.ProcessingJobParameter(parameterKey=k, parameterValue=v) for k, v in message["job_parameters"].items() ] - pid = _transport_object.do_create_ispyb_job( + pid = murfey.server._transport_object.do_create_ispyb_job( record, params=job_parameters ).get("return_value", None) else: - pid = _transport_object.do_create_ispyb_job(record).get( + pid = murfey.server._transport_object.do_create_ispyb_job(record).get( "return_value", None ) if pid is None: @@ -86,7 +86,7 @@ def run(message: dict, murfey_db: SQLModelSession): record = ISPyBDB.AutoProcProgram( processingJobId=pid, processingStartTime=datetime.now() ) - appid = _transport_object.do_update_processing_status(record).get( + appid = murfey.server._transport_object.do_update_processing_status(record).get( "return_value", None ) if appid is None: diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index e3d52d614..f7a19bed4 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -18,13 +18,12 @@ except ImportError: SMARTEM_ACTIVE = False -from murfey.server import _transport_object +import murfey.server from murfey.server.feedback import _murfey_id from murfey.util import sanitise, secure_path from murfey.util.config import get_machine_config, get_microscope from murfey.util.db import ( AutoProcProgram, - ClassificationFeedbackParameters, DataCollection, DataCollectionGroup, FoilHole, @@ -96,17 +95,19 @@ def register_grid_square( ) grid_square.pixel_size = grid_square_params.pixel_size or grid_square.pixel_size grid_square.image = grid_square_params.image or grid_square.image - if _transport_object: - _transport_object.do_update_grid_square(grid_square.id, grid_square_params) + if murfey.server._transport_object: + murfey.server._transport_object.do_update_grid_square( + grid_square.id, grid_square_params + ) else: # No existing grid square in the murfey database - if _transport_object: + if murfey.server._transport_object: dcg = murfey_db.exec( select(DataCollectionGroup) .where(DataCollectionGroup.session_id == session_id) .where(DataCollectionGroup.tag == grid_square_params.tag) ).one() - gs_ispyb_response = _transport_object.do_insert_grid_square( + gs_ispyb_response = murfey.server._transport_object.do_insert_grid_square( dcg.atlas_id, gsid, grid_square_params ) else: @@ -164,11 +165,15 @@ def register_grid_square( ".tiff" ).is_file(): secured_grid_square_image_path_full_res = ( - secured_grid_square_image_path_full_res.with_suffix(".tiff") + secured_grid_square_image_path_full_res.with_suffix( + ".tiff" + ) ) else: secured_grid_square_image_path_full_res = ( - secured_grid_square_image_path_full_res.with_suffix(".mrc") + secured_grid_square_image_path_full_res.with_suffix( + ".mrc" + ) ) smartem_client = SmartEMAPIClient( base_url=machine_config.smartem_api_url, logger=logger @@ -270,14 +275,14 @@ def register_foil_hole( foil_hole_params.thumbnail_size_y or foil_hole.thumbnail_size_y ) or jpeg_size[1] foil_hole.pixel_size = foil_hole_params.pixel_size or foil_hole.pixel_size - if _transport_object and gs.readout_area_x: - _transport_object.do_update_foil_hole( + if murfey.server._transport_object and gs.readout_area_x: + murfey.server._transport_object.do_update_foil_hole( foil_hole.id, gs.thumbnail_size_x / gs.readout_area_x, foil_hole_params ) else: # No existing foil hole in the murfey database - if _transport_object: - fh_ispyb_response = _transport_object.do_insert_foil_hole( + if murfey.server._transport_object: + fh_ispyb_response = murfey.server._transport_object.do_insert_foil_hole( gs.id, gs.thumbnail_size_x / gs.readout_area_x if gs.readout_area_x else None, foil_hole_params, @@ -491,18 +496,11 @@ def flush_spa_preprocess(message: dict, murfey_db: Session) -> dict[str, bool]: .where(AutoProcProgram.pj_id == ProcessingJob.id) .where(ProcessingJob.recipe == recipe_name) ).one() - params = murfey_db.exec( - select(SPARelionParameters, ClassificationFeedbackParameters) - .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) - ).one() - proc_params = params[0] - feedback_params = params[1] - if not proc_params: - logger.warning( - f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}" + proc_params = murfey_db.exec( + select(SPARelionParameters).where( + SPARelionParameters.pj_id == collected_ids[2].id ) - return {"success": False, "requeue": False} + ).one() murfey_ids = _murfey_id( collected_ids[3].id, @@ -510,10 +508,6 @@ def flush_spa_preprocess(message: dict, murfey_db: Session) -> dict[str, bool]: number=2 * len(stashed_files), close=False, ) - if feedback_params.picker_murfey_id is None: - feedback_params.picker_murfey_id = murfey_ids[1] - murfey_db.add(feedback_params) - for i, f in enumerate(stashed_files): try: foil_hole_id = None @@ -585,11 +579,11 @@ def flush_spa_preprocess(message: dict, murfey_db: Session) -> dict[str, bool]: "foil_hole_id": foil_hole_id, }, } - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue ) - _transport_object.send( + murfey.server._transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) murfey_db.delete(f) diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index f7c915d56..da542d17a 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -5,8 +5,8 @@ from sqlalchemy import func from sqlmodel import Session, select +import murfey.server import murfey.server.prometheus as prom -from murfey.server import _transport_object from murfey.server.feedback import ( _app_id, _pj_id, @@ -127,11 +127,11 @@ def _register_picked_particles_use_diameter(message: dict, _db: Session): }, "recipes": ["em-spa-extract"], } - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue ) - _transport_object.send( + murfey.server._transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) else: @@ -167,11 +167,11 @@ def _register_picked_particles_use_diameter(message: dict, _db: Session): }, "recipes": ["em-spa-extract"], } - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue ) - _transport_object.send( + murfey.server._transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) @@ -249,11 +249,13 @@ def _register_picked_particles_use_boxsize(message: dict, _db: Session): }, "recipes": ["em-spa-extract"], } - if _transport_object: + if murfey.server._transport_object: zocalo_message["parameters"]["feedback_queue"] = ( - _transport_object.feedback_queue + murfey.server._transport_object.feedback_queue + ) + murfey.server._transport_object.send( + "processing_recipe", zocalo_message, new_connection=True ) - _transport_object.send("processing_recipe", zocalo_message, new_connection=True) _db.close() @@ -266,8 +268,8 @@ def _request_email( config = get_machine_config(instrument_name=session.instrument_name)[ session.instrument_name ] - if _transport_object: - _transport_object.send( + if murfey.server._transport_object: + murfey.server._transport_object.send( config.notifications_queue, { "groupId": dcg_id,