From f902935d39289bb58cfef4ce46852d87c552c703 Mon Sep 17 00:00:00 2001 From: dixilo Date: Wed, 15 Apr 2026 14:48:20 +0900 Subject: [PATCH 1/2] Add stimulator functionality for flexible operation --- src/sorunlib/stimulator.py | 174 +++++++++++++++++++++++++++++-------- tests/test_stimulator.py | 112 ++++++++++++++++++++++++ 2 files changed, 252 insertions(+), 34 deletions(-) diff --git a/src/sorunlib/stimulator.py b/src/sorunlib/stimulator.py index 0bc775a..d77e049 100644 --- a/src/sorunlib/stimulator.py +++ b/src/sorunlib/stimulator.py @@ -1,11 +1,15 @@ import time import sorunlib as run from sorunlib._internal import check_response, protect_shutdown, stop_smurfs - ID_SHUTTER = 1 +_VOLT_ULIM_SOFT = 51 + +_VOLT_STEP = 1 +_VOLT_ALLOW = 1.2 +_RAMP_TIME_STEP = 7 -def _open_shutter(): +def open_shutter(): """Open the shutter of the stimulator""" ds = run.CLIENTS['stimulator']['ds378'] resp = ds.set_relay(relay_number=ID_SHUTTER, on_off=1) @@ -14,7 +18,7 @@ def _open_shutter(): time.sleep(3) -def _close_shutter(): +def close_shutter(): """Close the shutter of the stimulator""" ds = run.CLIENTS['stimulator']['ds378'] resp = ds.set_relay(relay_number=ID_SHUTTER, on_off=0) @@ -23,9 +27,30 @@ def _close_shutter(): time.sleep(3) +def rotate(speed_rpm, forward=True, start=True): + """Set the chopper rotation speed and start rotation. + + Parameters + ---------- + speed_rpm : float + Rotation speed in RPM. + forward : bool, optional + If True, the chopper rotates clockwise when viewed from the receiver + toward the stimulator. Defaults to True. + start : bool, optional + If True, start the rotation after setting the speed. Defaults to True. + """ + blh = run.CLIENTS['stimulator']['blh'] + resp = blh.set_values(speed=speed_rpm) + check_response(blh, resp) + if start: + resp = blh.start_rotation(forward=forward) + check_response(blh, resp) + + def _setup(): # Open shutter - _open_shutter() + open_shutter() # Acceleration / Decceleration configuration blh = run.CLIENTS['stimulator']['blh'] @@ -33,17 +58,117 @@ def _setup(): check_response(blh, resp) -@protect_shutdown -def _stop(): +def stop_rotation(): + """Stop the chopper rotation.""" blh = run.CLIENTS['stimulator']['blh'] - - # Stop rotation resp = blh.stop_rotation() check_response(blh, resp) time.sleep(10) + +@protect_shutdown +def _stop(): + stop_rotation() + # Close shutter - _close_shutter() + close_shutter() + + +def set_heater_voltage(volt, force=False): + """Set the PCR500MA source voltage to the given value immediately. + + Parameters + ---------- + volt : float + Target voltage in V. + force : bool, optional + If True, skip the discrepancy check between the target and measured + voltages. Defaults to False. + """ + assert volt < _VOLT_ULIM_SOFT, f'Target voltage exceeds {_VOLT_ULIM_SOFT}V.' + + pcr = run.CLIENTS['stimulator']['pcr500ma'] + + if not force: + response = pcr.acq.status() + v_meas = response.session['data']['V_AC'] + if abs(volt - v_meas) > _VOLT_ALLOW: + raise RuntimeError( + f'Discrepancy between target and measured voltages: ' + f'{volt} / {v_meas}' + ) + + resp = pcr.set_volt_ac(volt_set=volt) + check_response(pcr, resp) + + +def set_to_0V_heater(): + """Set the PCR500MA source voltage to 0V when output is OFF. + + This is intended for resetting the voltage setpoint while the output is + disabled, bypassing the normal ramp procedure. + """ + pcr = run.CLIENTS['stimulator']['pcr500ma'] + + if pcr.get_output().session['data']['output']: + raise RuntimeError('Output is ON. Use ramp_heater to ramp down to 0V.') + + set_heater_voltage(0, force=True) + + +def ramp_heater(volt, vstep=_VOLT_STEP): + """Ramp the PCR500MA source voltage to the target value. + + Parameters + ---------- + volt : float + Target voltage in V. + vstep : float, optional + Voltage step for ramping. Defaults to 1 V. + """ + assert volt < _VOLT_ULIM_SOFT, f'Target voltage exceeds {_VOLT_ULIM_SOFT}V.' + + pcr = run.CLIENTS['stimulator']['pcr500ma'] + + # Health check + _, _, s_meas = pcr.get_volt_ac() + v_target_tmp = s_meas['data']['volt_set'] + + if not pcr.get_output().session['data']['output']: + raise RuntimeError('Output on/off status is OFF') + + # Voltage plan + if v_target_tmp < volt: + n_step = int((volt - v_target_tmp) / vstep) + v_plan = [v_target_tmp + i * vstep for i in range(n_step)] + elif v_target_tmp > volt: + n_step = int((v_target_tmp - volt) / vstep) + v_plan = [v_target_tmp - i * vstep for i in range(n_step)] + else: + return + + v_plan = v_plan[1:] + [volt] + + for _v in v_plan: + time.sleep(0.5) + set_heater_voltage(_v) + time.sleep(_RAMP_TIME_STEP - 0.5) + + +def set_heater_output(output, force=False): + """Turn on or off the PCR500MA source output. + + Parameters + ---------- + output : bool + True to turn on, False to turn off. + force : bool, optional + If True, forcibly set the output bypassing safety checks. + Defaults to False. + """ + pcr = run.CLIENTS['stimulator']['pcr500ma'] + resp = pcr.set_output(output=output, force=force) + check_response(pcr, resp) def calibrate_tau(duration_step=20, @@ -77,7 +202,6 @@ def calibrate_tau(duration_step=20, If None is passed, will be (63/200)*sampling_rate. """ - blh = run.CLIENTS['stimulator']['blh'] downsample_factor = int(downsample_factor) try: @@ -102,21 +226,14 @@ def calibrate_tau(duration_step=20, if do_setup: _setup() - # Rotation setting - resp = blh.set_values(speed=speeds_rpm[0]) - check_response(blh, resp) - - resp = blh.start_rotation(forward=forward) - check_response(blh, resp) - + rotate(speeds_rpm[0], forward=forward) speeds_rpm = speeds_rpm[1:] # First data point time.sleep(duration_step) for speed_rpm in speeds_rpm: - resp = blh.set_values(speed=speed_rpm) - check_response(blh, resp) + rotate(speed_rpm, start=False) time.sleep(duration_step) finally: @@ -157,18 +274,14 @@ def calibrate_gain(duration=60, speed_rpm=90, If None is passed, will be (63/200)*sampling_rate. """ - blh = run.CLIENTS['stimulator']['blh'] downsample_factor = int(downsample_factor) try: - resp = blh.set_values(speed=speed_rpm) - check_response(blh, resp) + rotate(speed_rpm, start=False) if do_setup: _setup() - # Rotation setting - resp = blh.start_rotation(forward=forward) - check_response(blh, resp) + rotate(speed_rpm, forward=forward) # Sleep for rotation stabilization time.sleep(10) @@ -234,18 +347,12 @@ def calibrate_gain_tau(duration_gain=60, duration_tau=10, duration_stabilization If None is passed, will be (63/200)*sampling_rate. """ - blh = run.CLIENTS['stimulator']['blh'] downsample_factor = int(downsample_factor) try: # Shutter and chopper setup - resp = blh.set_values(speed=speed_rpm_gain) - check_response(blh, resp) _setup() - - # Rotation setting - resp = blh.start_rotation(forward=forward) - check_response(blh, resp) + rotate(speed_rpm_gain, forward=forward) # Sleep for rotation stabilization time.sleep(duration_stabilization) @@ -272,8 +379,7 @@ def calibrate_gain_tau(duration_gain=60, duration_tau=10, duration_stabilization time.sleep(duration_gain) for speed_rpm in speeds_rpm_tau: - resp = blh.set_values(speed=speed_rpm) - check_response(blh, resp) + rotate(speed_rpm, start=False) time.sleep(duration_stabilization) time.sleep(duration_tau) diff --git a/tests/test_stimulator.py b/tests/test_stimulator.py index 26514a4..0c5e686 100644 --- a/tests/test_stimulator.py +++ b/tests/test_stimulator.py @@ -14,6 +14,13 @@ patch_clients_lat = create_patch_clients('lat') +def _configure_pcr(pcr, volt_set=0, v_ac=0, output=True): + """Configure PCR mock return values.""" + pcr.get_volt_ac.return_value = (None, None, {'data': {'volt_set': volt_set}}) + pcr.acq.status.return_value.session = {'data': {'V_AC': v_ac}} + pcr.get_output.return_value.session = {'data': {'output': output}} + + @pytest.mark.parametrize("do_setup", [True, False]) @pytest.mark.parametrize("smurf_error", [True, False]) @patch('sorunlib.stimulator.time.sleep', MagicMock()) @@ -94,3 +101,108 @@ def test_calibrate_gain_tau(patch_clients_lat, smurf_error): # stop test stimulator.run.CLIENTS['stimulator']['blh'].stop_rotation.assert_called_with() stimulator.run.CLIENTS['stimulator']['ds378'].set_relay.assert_any_call(relay_number=1, on_off=0) + + +@pytest.mark.parametrize("method,on_off", [ + (stimulator.open_shutter, 1), + (stimulator.close_shutter, 0), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_shutter(patch_clients_lat, method, on_off): + method() + stimulator.run.CLIENTS['stimulator']['ds378'].set_relay.assert_called_with(relay_number=1, on_off=on_off) + + +@pytest.mark.parametrize("start", [True, False]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_rotate(patch_clients_lat, start): + stimulator.rotate(100, forward=True, start=start) + stimulator.run.CLIENTS['stimulator']['blh'].set_values.assert_called_with(speed=100) + if start: + stimulator.run.CLIENTS['stimulator']['blh'].start_rotation.assert_called_with(forward=True) + else: + stimulator.run.CLIENTS['stimulator']['blh'].start_rotation.assert_not_called() + + +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_stop_rotation(patch_clients_lat): + stimulator.stop_rotation() + stimulator.run.CLIENTS['stimulator']['blh'].stop_rotation.assert_called_with() + + +@pytest.mark.parametrize("volt,v_ac,force,check_called", [ + (10, 10, False, True), + (10, 0, True, False), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_set_heater_voltage(patch_clients_lat, volt, v_ac, force, check_called): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + _configure_pcr(pcr, v_ac=v_ac) + stimulator.set_heater_voltage(volt, force=force) + if check_called: + pcr.acq.status.assert_called() + else: + pcr.acq.status.assert_not_called() + pcr.set_volt_ac.assert_called_with(volt_set=volt) + + +@pytest.mark.parametrize("volt,v_ac,exc", [ + (10, 0, RuntimeError), + (51, 0, AssertionError), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_set_heater_voltage_error(patch_clients_lat, volt, v_ac, exc): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + _configure_pcr(pcr, v_ac=v_ac) + with pytest.raises(exc): + stimulator.set_heater_voltage(volt) + + +@pytest.mark.parametrize("output,raises", [ + (False, False), + (True, True), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_set_to_0V_heater(patch_clients_lat, output, raises): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + _configure_pcr(pcr, output=output) + if raises: + with pytest.raises(RuntimeError): + stimulator.set_to_0V_heater() + else: + stimulator.set_to_0V_heater() + pcr.set_volt_ac.assert_called_with(volt_set=0) + + +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_ramp_heater(patch_clients_lat): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + _configure_pcr(pcr, volt_set=0, output=True) + with patch('sorunlib.stimulator.set_heater_voltage') as mock_set_voltage: + stimulator.ramp_heater(3) + calls = [c.args[0] for c in mock_set_voltage.call_args_list] + assert calls == [1, 2, 3] + + +@pytest.mark.parametrize("volt,output,exc", [ + (3, False, RuntimeError), + (51, True, AssertionError), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_ramp_heater_error(patch_clients_lat, volt, output, exc): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + _configure_pcr(pcr, volt_set=0, output=output) + with pytest.raises(exc): + stimulator.ramp_heater(volt) + + +@pytest.mark.parametrize("output,force", [ + (True, False), + (False, False), + (False, True), +]) +@patch('sorunlib.stimulator.time.sleep', MagicMock()) +def test_set_heater_output(patch_clients_lat, output, force): + pcr = stimulator.run.CLIENTS['stimulator']['pcr500ma'] + stimulator.set_heater_output(output, force=force) + pcr.set_output.assert_called_with(output=output, force=force) From a12e918314f8ef48217c7db5cb2a6585ed13b25b Mon Sep 17 00:00:00 2001 From: dixilo Date: Fri, 17 Apr 2026 09:26:27 +0900 Subject: [PATCH 2/2] Added function to help recovering heater after power outage. --- src/sorunlib/stimulator.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/sorunlib/stimulator.py b/src/sorunlib/stimulator.py index d77e049..55ac695 100644 --- a/src/sorunlib/stimulator.py +++ b/src/sorunlib/stimulator.py @@ -171,6 +171,40 @@ def set_heater_output(output, force=False): check_response(pcr, resp) +def heater_recovery(volt, vstep=_VOLT_STEP): + """Recover the PCR500MA heater output to the target voltage after a power + outage or unexpected shutdown. + + Retrieves the current voltage setpoint and output state, then brings the + output back to ``volt`` via the following procedure: + + - Output ON : ramp directly to ``volt``. + - Output OFF, setpoint > 0 : reset setpoint to 0 V with + :func:`set_to_0V_heater`, turn output on, then ramp to ``volt``. + - Output OFF, setpoint == 0 : turn output on, then ramp to ``volt``. + + Parameters + ---------- + volt : float + Target voltage in V. + vstep : float, optional + Voltage step for ramping. Defaults to 1 V. + """ + pcr = run.CLIENTS['stimulator']['pcr500ma'] + + _, _, s_meas = pcr.get_volt_ac() + v_current = s_meas['data']['volt_set'] + output = pcr.get_output().session['data']['output'] + + if output: + ramp_heater(volt, vstep=vstep) + else: + if v_current > 0: + set_to_0V_heater() + set_heater_output(True) + ramp_heater(volt, vstep=vstep) + + def calibrate_tau(duration_step=20, speeds_rpm=[225, 495, 945, 1395, 1845, 2205], forward=True, do_setup=True, stop=True,