Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 174 additions & 34 deletions src/sorunlib/stimulator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import time
import sorunlib as run
from sorunlib._internal import check_response, protect_shutdown, stop_smurfs

ID_SHUTTER = 1
_VOLT_ULIM_SOFT = 51

_VOLT_STEP = 1
_VOLT_ALLOW = 1.2
_RAMP_TIME_STEP = 7


def _open_shutter():
def open_shutter():
"""Open the shutter of the stimulator"""
ds = run.CLIENTS['stimulator']['ds378']
resp = ds.set_relay(relay_number=ID_SHUTTER, on_off=1)
Expand All @@ -14,7 +18,7 @@ def _open_shutter():
time.sleep(3)


def _close_shutter():
def close_shutter():
"""Close the shutter of the stimulator"""
ds = run.CLIENTS['stimulator']['ds378']
resp = ds.set_relay(relay_number=ID_SHUTTER, on_off=0)
Expand All @@ -23,27 +27,182 @@ def _close_shutter():
time.sleep(3)


def rotate(speed_rpm, forward=True, start=True):
"""Set the chopper rotation speed and start rotation.

Parameters
----------
speed_rpm : float
Rotation speed in RPM.
forward : bool, optional
If True, the chopper rotates clockwise when viewed from the receiver
toward the stimulator. Defaults to True.
start : bool, optional
If True, start the rotation after setting the speed. Defaults to True.
"""
blh = run.CLIENTS['stimulator']['blh']
resp = blh.set_values(speed=speed_rpm)
check_response(blh, resp)
if start:
resp = blh.start_rotation(forward=forward)
check_response(blh, resp)


def _setup():
# Open shutter
_open_shutter()
open_shutter()

# Acceleration / Decceleration configuration
blh = run.CLIENTS['stimulator']['blh']
resp = blh.set_values(accl_time=10, decl_time=10)
check_response(blh, resp)


@protect_shutdown
def _stop():
def stop_rotation():
"""Stop the chopper rotation."""
blh = run.CLIENTS['stimulator']['blh']

# Stop rotation
resp = blh.stop_rotation()
check_response(blh, resp)
time.sleep(10)


@protect_shutdown
def _stop():
stop_rotation()

# Close shutter
_close_shutter()
close_shutter()


def set_heater_voltage(volt, force=False):
"""Set the PCR500MA source voltage to the given value immediately.

Parameters
----------
volt : float
Target voltage in V.
force : bool, optional
If True, skip the discrepancy check between the target and measured
voltages. Defaults to False.
"""
assert volt < _VOLT_ULIM_SOFT, f'Target voltage exceeds {_VOLT_ULIM_SOFT}V.'

pcr = run.CLIENTS['stimulator']['pcr500ma']

if not force:
response = pcr.acq.status()
v_meas = response.session['data']['V_AC']
if abs(volt - v_meas) > _VOLT_ALLOW:
raise RuntimeError(
f'Discrepancy between target and measured voltages: '
f'{volt} / {v_meas}'
)

resp = pcr.set_volt_ac(volt_set=volt)
check_response(pcr, resp)


def set_to_0V_heater():
"""Set the PCR500MA source voltage to 0V when output is OFF.

This is intended for resetting the voltage setpoint while the output is
disabled, bypassing the normal ramp procedure.
"""
pcr = run.CLIENTS['stimulator']['pcr500ma']

if pcr.get_output().session['data']['output']:
raise RuntimeError('Output is ON. Use ramp_heater to ramp down to 0V.')

set_heater_voltage(0, force=True)


def ramp_heater(volt, vstep=_VOLT_STEP):
"""Ramp the PCR500MA source voltage to the target value.

Parameters
----------
volt : float
Target voltage in V.
vstep : float, optional
Voltage step for ramping. Defaults to 1 V.
"""
assert volt < _VOLT_ULIM_SOFT, f'Target voltage exceeds {_VOLT_ULIM_SOFT}V.'

pcr = run.CLIENTS['stimulator']['pcr500ma']

# Health check
_, _, s_meas = pcr.get_volt_ac()
v_target_tmp = s_meas['data']['volt_set']

if not pcr.get_output().session['data']['output']:
raise RuntimeError('Output on/off status is OFF')

# Voltage plan
if v_target_tmp < volt:
n_step = int((volt - v_target_tmp) / vstep)
v_plan = [v_target_tmp + i * vstep for i in range(n_step)]
elif v_target_tmp > volt:
n_step = int((v_target_tmp - volt) / vstep)
v_plan = [v_target_tmp - i * vstep for i in range(n_step)]
else:
return

v_plan = v_plan[1:] + [volt]

for _v in v_plan:
time.sleep(0.5)
set_heater_voltage(_v)
time.sleep(_RAMP_TIME_STEP - 0.5)


def set_heater_output(output, force=False):
"""Turn on or off the PCR500MA source output.

Parameters
----------
output : bool
True to turn on, False to turn off.
force : bool, optional
If True, forcibly set the output bypassing safety checks.
Defaults to False.
"""
pcr = run.CLIENTS['stimulator']['pcr500ma']
resp = pcr.set_output(output=output, force=force)
check_response(pcr, resp)


def heater_recovery(volt, vstep=_VOLT_STEP):
"""Recover the PCR500MA heater output to the target voltage after a power
outage or unexpected shutdown.

Retrieves the current voltage setpoint and output state, then brings the
output back to ``volt`` via the following procedure:

- Output ON : ramp directly to ``volt``.
- Output OFF, setpoint > 0 : reset setpoint to 0 V with
:func:`set_to_0V_heater`, turn output on, then ramp to ``volt``.
- Output OFF, setpoint == 0 : turn output on, then ramp to ``volt``.

Parameters
----------
volt : float
Target voltage in V.
vstep : float, optional
Voltage step for ramping. Defaults to 1 V.
"""
pcr = run.CLIENTS['stimulator']['pcr500ma']

_, _, s_meas = pcr.get_volt_ac()
v_current = s_meas['data']['volt_set']
output = pcr.get_output().session['data']['output']

if output:
ramp_heater(volt, vstep=vstep)
else:
if v_current > 0:
set_to_0V_heater()
set_heater_output(True)
ramp_heater(volt, vstep=vstep)


def calibrate_tau(duration_step=20,
Expand Down Expand Up @@ -77,7 +236,6 @@ def calibrate_tau(duration_step=20,
If None is passed, will be (63/200)*sampling_rate.
"""

blh = run.CLIENTS['stimulator']['blh']
downsample_factor = int(downsample_factor)

try:
Expand All @@ -102,21 +260,14 @@ def calibrate_tau(duration_step=20,

if do_setup:
_setup()
# Rotation setting
resp = blh.set_values(speed=speeds_rpm[0])
check_response(blh, resp)

resp = blh.start_rotation(forward=forward)
check_response(blh, resp)

rotate(speeds_rpm[0], forward=forward)
speeds_rpm = speeds_rpm[1:]

# First data point
time.sleep(duration_step)

for speed_rpm in speeds_rpm:
resp = blh.set_values(speed=speed_rpm)
check_response(blh, resp)
rotate(speed_rpm, start=False)

time.sleep(duration_step)
finally:
Expand Down Expand Up @@ -157,18 +308,14 @@ def calibrate_gain(duration=60, speed_rpm=90,
If None is passed, will be (63/200)*sampling_rate.
"""

blh = run.CLIENTS['stimulator']['blh']
downsample_factor = int(downsample_factor)

try:
resp = blh.set_values(speed=speed_rpm)
check_response(blh, resp)
rotate(speed_rpm, start=False)

if do_setup:
_setup()
# Rotation setting
resp = blh.start_rotation(forward=forward)
check_response(blh, resp)
rotate(speed_rpm, forward=forward)

# Sleep for rotation stabilization
time.sleep(10)
Expand Down Expand Up @@ -234,18 +381,12 @@ def calibrate_gain_tau(duration_gain=60, duration_tau=10, duration_stabilization
If None is passed, will be (63/200)*sampling_rate.
"""

blh = run.CLIENTS['stimulator']['blh']
downsample_factor = int(downsample_factor)

try:
# Shutter and chopper setup
resp = blh.set_values(speed=speed_rpm_gain)
check_response(blh, resp)
_setup()

# Rotation setting
resp = blh.start_rotation(forward=forward)
check_response(blh, resp)
rotate(speed_rpm_gain, forward=forward)

# Sleep for rotation stabilization
time.sleep(duration_stabilization)
Expand All @@ -272,8 +413,7 @@ def calibrate_gain_tau(duration_gain=60, duration_tau=10, duration_stabilization
time.sleep(duration_gain)

for speed_rpm in speeds_rpm_tau:
resp = blh.set_values(speed=speed_rpm)
check_response(blh, resp)
rotate(speed_rpm, start=False)

time.sleep(duration_stabilization)
time.sleep(duration_tau)
Expand Down
Loading