diff --git a/src/sorunlib/wiregrid.py b/src/sorunlib/wiregrid.py index d1ca15e..99138e1 100644 --- a/src/sorunlib/wiregrid.py +++ b/src/sorunlib/wiregrid.py @@ -198,6 +198,112 @@ def _check_temperature_sensors(): _verify_temp_response(resp, 'AIN2C', 0) +def _check_wiregrid_position(): + """Check the wiregrid position.""" + actuator = run.CLIENTS['wiregrid']['actuator'] + resp = actuator.acq.status() + ls_status = resp.session['data']['fields']['limitswitch'] + if (ls_status['LSR1'] == 1 or ls_status['LSL1'] == 1) and not (ls_status['LSR2'] == 1 and ls_status['LSL2'] == 1): + position = 'outside' + elif (ls_status['LSR2'] == 1 or ls_status['LSL2'] == 1) and not (ls_status['LSR1'] == 1 or ls_status['LSL1'] == 1): + position = 'inside' + else: + raise RuntimeError("The wiregrid position is unknown. The limitswitch response is like this:\n" + + "LSR1: {}\n".format('ON' if ls_status['LSR1'] == 1 else 'OFF') + + "LSL1: {}\n".format('ON' if ls_status['LSL1'] == 1 else 'OFF') + + "LSR2: {}\n".format('ON' if ls_status['LSR2'] == 1 else 'OFF') + + "LSL2: {}\n".format('ON' if ls_status['LSL2'] == 1 else 'OFF') + + "Aborting...") + return position + + +def _stop_hwp_with_wiregrid(): + """Stop the HWP after comfirming the wiregrid is inserted.""" + # Check the wiregrid position + position = _check_wiregrid_position() + if position == 'outside': + raise RuntimeError("The wiregrid is not inserted. Aborting...") + elif position == 'inside': + # Stop the HWP + print("Starting to stop the HWP.") + run.hwp.stop(active=True) + print("The HWP stopped successfully.") + else: + raise RuntimeError("Unknown wiregrid position. Aborting...") + + +def _spin_hwp_with_wiregrid(target_hwp_direction): + """Spin the HWP to the target direction at 2Hz after comfirming the wiregrid is inserted. + + Args: + target_hwp_direction (str): Target HWP direction, 'forward' or 'backward'. + """ + # Check the wiregrid position + position = _check_wiregrid_position() + if position == 'outside': + raise RuntimeError("The wiregrid is not inserted. Aborting...") + elif position == 'inside': + if target_hwp_direction == 'forward': + print("Starting to spin up the HWP in forward.") + run.hwp.set_freq(freq=2.0) + print("The HWP is spinning at 2Hz in forward successfully.") + elif target_hwp_direction == 'backward': + print("Starting to spin up the HWP in backward.") + run.hwp.set_freq(freq=-2.0) + print("The HWP is spinning at 2Hz in backward successfully.") + else: + raise RuntimeError("Unknown HWP target direction. Aborting...") + else: + raise RuntimeError("Unknown wiregrid position. Aborting...") + + +def _reverse_hwp_with_wiregrid(initial_hwp_direction, streaming=False, stepwise_before=False, stepwise_after=False): + """Change the HWP direction after comfirming the wiregrid is inserted. + + Args: + initial_hwp_direction (str): Initial HWP direction, 'forward' or 'backward'. + streaming (bool): Do SMuRF streaming during the HWP direction chaging or not . Default is False. + stepwise_before (bool): Do stepwise rotation before changing HWP direction or not. Default is False. + stepwise_after (bool): Do stepwise rotation after changing HWP direction or not. Default is False. + """ + if initial_hwp_direction not in ['forward', 'backward']: + raise RuntimeError("Initial HWP direction should be either 'forward' or 'backward'. Aborting...") + + current_hwp_direction = initial_hwp_direction + if current_hwp_direction == 'forward': + target_hwp_direction = 'backward' + elif current_hwp_direction == 'backward': + target_hwp_direction = 'forward' + + # Stop and spin up reversely the HWP + try: + # Enable SMuRF streams + if streaming: + stream_tag = f'wiregrid, wg_time_constant, hwp_change_to_{target_hwp_direction}' + if stepwise_before or stepwise_after: + stream_tag += ', wg_stepwise' + if _check_zenith(): + stream_tag += ', wg_el90' + run.smurf.stream('on', subtype='cal', tag=stream_tag) + # Stepwise rotation before stopping the HWP + if stepwise_before: + rotate(False) + # Stop the HWP + _stop_hwp_with_wiregrid() + # Spin up the HWP in the opposite direction + _spin_hwp_with_wiregrid(target_hwp_direction) + current_hwp_direction = target_hwp_direction + # Stepwise rotation after spinning up the HWP + if stepwise_after: + rotate(False) + finally: + # Stop SMuRF streams + if streaming: + run.smurf.stream('off') + + return current_hwp_direction + + # Public API def insert(): """Insert the wiregrid.""" @@ -305,3 +411,100 @@ def calibrate(continuous=False, elevation_check=True, boresight_check=True, finally: # Stop SMuRF streams run.smurf.stream('off') + + +def time_constant(initial_hwp_direction, stepwise_first=True, stepwise_last=True, stepwise_mid=False, repeat=1): + """ + Run a wiregrid time constant measurement. + + Args: + initial_hwp_direction (str): Initial HWP direction, 'forward' or 'backward'. + stepwise_first (bool): Do stepwise rotation or not before the first HWP speed change. Default is True. + stepwise_last (bool): Do stepwise rotation or not after the last HWP speed change. Default is True. + stepwise_mid (bool): Do stepwise rotation between each HWP speed change. Default is False. + repeat (int): Number of repeats. Default is 1. + If this is odd, the HWP direction will be changed to the opposite of the initial direction. + If this is even, the HWP direction will be the same as the initial direction. + """ + + # Check the initial HWP direction + if initial_hwp_direction not in ['forward', 'backward']: + raise RuntimeError("Initial HWP direction should be either 'forward' or 'backward'. Aborting...") + current_hwp_direction = initial_hwp_direction + + # Check the repeat + if repeat < 1 or not isinstance(repeat, int): + raise RuntimeError("The repeat should be int and larger than 0. Aborting...") + + _check_agents_online() + _check_motor_on() + _check_telescope_position(elevation_check=True, boresight_check=False) + + if _check_zenith: + el_tag = ', wg_el90' + else: + el_tag = '' + + # Rotate for reference before insertion + rotate(continuous=True, duration=10) + + # Bias step (the wire grid is off the window) + bs_tag = f'biasstep, wg_time_constant, wg_ejected, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.bias_step(tag=bs_tag, concurrent=True) + + # Insert the wiregrid with streaming + time.sleep(5) + try: + # Enable SMuRF streams + stream_tag = f'wg_time_constant, wg_inserting, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.stream('on', tag=stream_tag, subtype='cal') + # Insert the wiregrid + insert() + finally: + # Stop SMuRF streams + run.smurf.stream('off') + time.sleep(5) + + for i in range(repeat): + # Bias step (the wire grid is on the window) + bs_tag = f'biasstep, wg_time_constant, wg_inserted, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.bias_step(tag=bs_tag, concurrent=True) + + stepwise_before = False + stepwise_after = False + if stepwise_first and i == 0: + # Stepwise rotation before the first HWP speed change + stepwise_before = True + if stepwise_mid and i != 0: + # Stepwise rotation between changing HWP speed + stepwise_before = True + if stepwise_last and i == repeat - 1: + # Stepwise rotation after the last HWP speed change + stepwise_after = True + + # Spin the HWP in the opposite direction of the initial direction with streaming and stepwise rotation + current_hwp_direction = _reverse_hwp_with_wiregrid(current_hwp_direction, + streaming=True, + stepwise_before=stepwise_before, + stepwise_after=stepwise_after) + + # Bias step (the wire grid is on the window) + bs_tag = f'biasstep, wg_time_constant, wg_inserted, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.bias_step(tag=bs_tag, concurrent=True) + + # Eject the wiregrid with streaming + time.sleep(5) + try: + # Enable SMuRF streams + stream_tag = f'wg_time_constant, wg_ejecting, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.stream('on', tag=stream_tag, subtype='cal') + # Eject the wiregrid + eject() + finally: + # Stop SMuRF streams + run.smurf.stream('off') + time.sleep(5) + + # Bias step (the wire grid is off the window) + bs_tag = f'biasstep, wg_time_constant, wg_ejected, hwp_2hz_{current_hwp_direction}' + el_tag + run.smurf.bias_step(tag=bs_tag, concurrent=True)