diff --git a/default_config.json b/default_config.json index e1694891..b7bf33ba 100644 --- a/default_config.json +++ b/default_config.json @@ -177,6 +177,5 @@ "active_telescope_index": 0, "active_eyepiece_index": 0 }, - "imu_threshold_scale": 1, - "imu_integrator": "classic" + "imu_threshold_scale": 1 } diff --git a/python/PiFinder/imu_pi_classic.py b/python/PiFinder/imu_pi_classic.py deleted file mode 100644 index 2372fb61..00000000 --- a/python/PiFinder/imu_pi_classic.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/python -# -*- coding:utf-8 -*- -""" -This module is for IMU related functions - -Classic version: Uses axis remapping and euler angle output for the -classic (alt/az offset) integrator dead-reckoning approach. -""" - -import time -from PiFinder.multiproclogging import MultiprocLogging -import board -import adafruit_bno055 -import logging -import quaternion # numpy-quaternion - -from scipy.spatial.transform import Rotation - -from PiFinder import config - -logger = logging.getLogger("IMU.pi") - -QUEUE_LEN = 10 -MOVE_CHECK_LEN = 2 - - -class Imu: - def __init__(self): - i2c = board.I2C() - self.sensor = adafruit_bno055.BNO055_I2C(i2c) - self.sensor.mode = adafruit_bno055.IMUPLUS_MODE - # self.sensor.mode = adafruit_bno055.NDOF_MODE - cfg = config.Config() - if ( - cfg.get_option("screen_direction") == "flat" - or cfg.get_option("screen_direction") == "straight" - or cfg.get_option("screen_direction") == "flat3" - ): - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_NEGATIVE, - ) - elif cfg.get_option("screen_direction") == "as_bloom": - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - ) - else: - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - ) - self.quat_history = [(0, 0, 0, 0)] * QUEUE_LEN - self._flip_count = 0 - self.calibration = 0 - self.avg_quat = (0, 0, 0, 0) - self.__moving = False - - self.last_sample_time = time.time() - - # Calibration settings - self.imu_sample_frequency = 1 / 30 - - # First value is delta to exceed between samples - # to start moving, second is threshold to fall below - # to stop moving. - - imu_threshold_scale = cfg.get_option("imu_threshold_scale", 1) - self.__moving_threshold = ( - 0.0005 * imu_threshold_scale, - 0.0003 * imu_threshold_scale, - ) - - def quat_to_euler(self, quat): - if quat[0] + quat[1] + quat[2] + quat[3] == 0: - return 0, 0, 0 - rot = Rotation.from_quat(quat) - rot_euler = rot.as_euler("xyz", degrees=True) - # convert from -180/180 to 0/360 - rot_euler[0] += 180 - rot_euler[1] += 180 - rot_euler[2] += 180 - return rot_euler - - def moving(self): - """ - Compares most recent reading - with past readings - """ - return self.__moving - - def update(self): - # check for update frequency - if time.time() - self.last_sample_time < self.imu_sample_frequency: - return - - self.last_sample_time = time.time() - - # Throw out non-calibrated data - self.calibration = self.sensor.calibration_status[1] - if self.calibration == 0: - logger.warning("NOIMU CAL") - return True - quat = self.sensor.quaternion - if quat[0] is None: - logger.warning("IMU: Failed to get sensor values") - return - - _quat_diff = [] - for i in range(4): - _quat_diff.append(abs(quat[i] - self.quat_history[-1][i])) - - self.__reading_diff = sum(_quat_diff) - - # This seems to be some sort of defect / side effect - # of the integration system in the BNO055 - # When not moving quat output will vaccilate - # by exactly this amount... so filter this out - if self.__reading_diff == 0.0078125: - self.__reading_diff = 0 - return - - # Sometimes the quat output will 'flip' and change by 2.0+ - # from one reading to another. This is clearly noise or an - # artifact, so filter them out - if self.__reading_diff > 1.5: - self._flip_count += 1 - if self._flip_count > 10: - # with the history initialized to 0,0,0,0 the unit - # can get stuck seeing flips if the IMU starts - # returning data. This count will reset history - # to the current state if it exceeds 10 - self.quat_history = [quat] * QUEUE_LEN - self.__reading_diff = 0 - else: - self.__reading_diff = 0 - return - else: - # no flip - self._flip_count = 0 - - self.avg_quat = quat - if len(self.quat_history) == QUEUE_LEN: - self.quat_history = self.quat_history[1:] - self.quat_history.append(quat) - - if self.__moving: - if self.__reading_diff < self.__moving_threshold[1]: - self.__moving = False - else: - if self.__reading_diff > self.__moving_threshold[0]: - self.__moving = True - - def get_euler(self): - return list(self.quat_to_euler(self.avg_quat)) - - def __str__(self): - return ( - f"IMU Information:\n" - f"Calibration Status: {self.calibration}\n" - f"Quaternion History: {self.quat_history}\n" - f"Average Quaternion: {self.avg_quat}\n" - f"Moving: {self.moving()}\n" - f"Reading Difference: {self.__reading_diff}\n" - f"Flip Count: {self._flip_count}\n" - f"Last Sample Time: {self.last_sample_time}\n" - f"IMU Sample Frequency: {self.imu_sample_frequency}\n" - f"Moving Threshold: {self.__moving_threshold}\n" - ) - - -def imu_monitor(shared_state, console_queue, log_queue): - MultiprocLogging.configurer(log_queue) - logger.debug("Starting IMU") - imu = None - try: - imu = Imu() - except Exception as e: - logger.error(f"Error starting phyiscal IMU : {e}") - logger.error("Falling back to fake IMU") - console_queue.put("IMU: Error starting physical IMU, using fake IMU") - console_queue.put("DEGRADED_OPS IMU") - from PiFinder.imu_fake import Imu as ImuFake - - imu = ImuFake() - - imu = Imu() - imu_calibrated = False - imu_data = { - "moving": False, - "move_start": None, - "move_end": None, - "pos": [0, 0, 0], - "quat": quaternion.quaternion(0, 0, 0, 0), - "start_pos": [0, 0, 0], - "status": 0, - } - while True: - imu.update() - imu_data["status"] = imu.calibration - if imu.moving(): - if not imu_data["moving"]: - logger.debug("IMU: move start") - imu_data["moving"] = True - imu_data["start_pos"] = imu_data["pos"] - imu_data["move_start"] = time.time() - imu_data["pos"] = imu.get_euler() - imu_data["quat"] = quaternion.from_float_array(imu.avg_quat) - - else: - if imu_data["moving"]: - # If we were moving and we now stopped - logger.debug("IMU: move end") - imu_data["moving"] = False - imu_data["pos"] = imu.get_euler() - imu_data["quat"] = quaternion.from_float_array(imu.avg_quat) - imu_data["move_end"] = time.time() - - if not imu_calibrated: - if imu_data["status"] == 3: - imu_calibrated = True - console_queue.put("IMU: NDOF Calibrated!") - - if shared_state is not None and imu_calibrated: - shared_state.set_imu(imu_data) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - logger.info("Trying to read state from IMU") - imu = None - try: - imu = Imu() - for i in range(10): - imu.update() - time.sleep(0.5) - except Exception as e: - logger.exception("Error starting phyiscal IMU", e) diff --git a/python/PiFinder/integrator_classic.py b/python/PiFinder/integrator_classic.py deleted file mode 100644 index 2ed8da79..00000000 --- a/python/PiFinder/integrator_classic.py +++ /dev/null @@ -1,307 +0,0 @@ -#!/usr/bin/python -# -*- coding:utf-8 -*- -""" -This module is the classic integrator using euler-angle/alt-az IMU -dead-reckoning. - -* Checks IMU -* Uses alt/az euler angle offsets from IMU for dead-reckoning -* Converts back to RA/Dec via altaz_to_radec - -""" - -import queue -import time -import copy -import logging - -from PiFinder import config -from PiFinder import state_utils -import PiFinder.calc_utils as calc_utils -from PiFinder.multiproclogging import MultiprocLogging - -IMU_ALT = 2 -IMU_AZ = 0 - -logger = logging.getLogger("IMU.Integrator") - - -def imu_moved(imu_a, imu_b): - """ - Compares two IMU states to determine if they are the 'same' - if either is none, returns False - """ - if imu_a is None: - return False - if imu_b is None: - return False - - # figure out the abs difference - diff = ( - abs(imu_a[0] - imu_b[0]) + abs(imu_a[1] - imu_b[1]) + abs(imu_a[2] - imu_b[2]) - ) - if diff > 0.001: - return True - return False - - -def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=False): - MultiprocLogging.configurer(log_queue) - try: - if is_debug: - logger.setLevel(logging.DEBUG) - logger.debug("Starting Integrator") - - solved = { - "RA": None, - "Dec": None, - "Roll": None, - "camera_center": { - "RA": None, - "Dec": None, - "Roll": None, - "Alt": None, - "Az": None, - }, - "camera_solve": { - "RA": None, - "Dec": None, - "Roll": None, - }, - "Roll_offset": 0, # May/may not be needed - for experimentation - "imu_pos": None, - "imu_quat": None, - "Alt": None, - "Az": None, - "solve_source": None, - "solve_time": None, - "cam_solve_time": 0, - "constellation": None, - } - cfg = config.Config() - if ( - cfg.get_option("screen_direction") == "left" - or cfg.get_option("screen_direction") == "flat" - or cfg.get_option("screen_direction") == "flat3" - or cfg.get_option("screen_direction") == "straight" - ): - flip_alt_offset = True - else: - flip_alt_offset = False - - # This holds the last image solve position info - # so we can delta for IMU updates - last_image_solve = None - last_solve_time = time.time() - while True: - state_utils.sleep_for_framerate(shared_state) - - # Check for new camera solve in queue - next_image_solve = None - try: - next_image_solve = solver_queue.get(block=False) - except queue.Empty: - pass - - if type(next_image_solve) is dict: - # For camera solves, always start from last successful camera solve - # NOT from shared_state (which may contain IMU drift) - # This prevents IMU noise accumulation during failed solves - if last_image_solve: - solved = copy.deepcopy(last_image_solve) - # If no successful solve yet, keep initial solved dict - - # Update solve metadata (always needed for auto-exposure) - for key in [ - "Matches", - "RMSE", - "last_solve_attempt", - "last_solve_success", - ]: - if key in next_image_solve: - solved[key] = next_image_solve[key] - - # Only update position data if solve succeeded (RA not None) - if next_image_solve.get("RA") is not None: - solved.update(next_image_solve) - - # Recalculate Alt/Az for NEW successful solve - location = shared_state.location() - dt = shared_state.datetime() - - if location and dt: - # We have position and time/date and a valid solve! - calc_utils.sf_utils.set_location( - location.lat, - location.lon, - location.altitude, - ) - alt, az = calc_utils.sf_utils.radec_to_altaz( - solved["RA"], - solved["Dec"], - dt, - ) - solved["Alt"] = alt - solved["Az"] = az - - alt, az = calc_utils.sf_utils.radec_to_altaz( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - solved["camera_center"]["Alt"] = alt - solved["camera_center"]["Az"] = az - - # Experimental: For monitoring roll offset - # Estimate the roll offset due misalignment of the - # camera sensor with the Pole-to-Source great circle. - solved["Roll_offset"] = estimate_roll_offset(solved, dt) - # Find the roll at the target RA/Dec. Note that this doesn't include the - # roll offset so it's not the roll that the PiFinder camear sees but the - # roll relative to the celestial pole - roll_target_calculated = calc_utils.sf_utils.radec_to_roll( - solved["RA"], solved["Dec"], dt - ) - # Compensate for the roll offset. This gives the roll at the target - # as seen by the camera. - solved["Roll"] = roll_target_calculated + solved["Roll_offset"] - - # calculate roll for camera center - roll_target_calculated = calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - # Compensate for the roll offset. This gives the roll at the target - # as seen by the camera. - solved["camera_center"]["Roll"] = ( - roll_target_calculated + solved["Roll_offset"] - ) - # For failed solves, preserve ALL position data from previous solve - # Don't recalculate from GPS (causes drift from GPS noise) - - # Set solve_source and push camera solves immediately - if solved["RA"] is not None: - last_image_solve = copy.deepcopy(solved) - solved["solve_source"] = "CAM" - # Calculate constellation for successful solve - solved["constellation"] = ( - calc_utils.sf_utils.radec_to_constellation( - solved["RA"], solved["Dec"] - ) - ) - shared_state.set_solve_state(True) - else: - # Failed solve - clear constellation - solved["solve_source"] = "CAM_FAILED" - solved["constellation"] = "" - shared_state.set_solve_state(False) - - # Push all camera solves (success and failure) immediately - # This ensures auto-exposure sees Matches=0 for failed solves - shared_state.set_solution(solved) - - # Use IMU dead-reckoning from the last camera solve: - # Check we have an alt/az solve, otherwise we can't use the IMU - elif solved["Alt"]: - imu = shared_state.imu() - if imu: - dt = shared_state.datetime() - if last_image_solve and last_image_solve["Alt"]: - # If we have alt, then we have a position/time - - # calc new alt/az - lis_imu = last_image_solve["imu_pos"] - imu_pos = imu["pos"] - if imu_moved(lis_imu, imu_pos): - alt_offset = imu_pos[IMU_ALT] - lis_imu[IMU_ALT] - if flip_alt_offset: - alt_offset = ((alt_offset + 180) % 360 - 180) * -1 - else: - alt_offset = (alt_offset + 180) % 360 - 180 - solved["Alt"] = (last_image_solve["Alt"] - alt_offset) % 360 - solved["camera_center"]["Alt"] = ( - last_image_solve["camera_center"]["Alt"] - alt_offset - ) % 360 - - az_offset = imu_pos[IMU_AZ] - lis_imu[IMU_AZ] - az_offset = (az_offset + 180) % 360 - 180 - solved["Az"] = (last_image_solve["Az"] + az_offset) % 360 - solved["camera_center"]["Az"] = ( - last_image_solve["camera_center"]["Az"] + az_offset - ) % 360 - - # N.B. Assumes that location hasn't changed since last solve - # Turn this into RA/DEC - ( - solved["RA"], - solved["Dec"], - ) = calc_utils.sf_utils.altaz_to_radec( - solved["Alt"], solved["Az"], dt - ) - # Calculate the roll at the target RA/Dec and compensate for the offset. - solved["Roll"] = ( - calc_utils.sf_utils.radec_to_roll( - solved["RA"], solved["Dec"], dt - ) - + solved["Roll_offset"] - ) - - # Now for camera centered solve - ( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - ) = calc_utils.sf_utils.altaz_to_radec( - solved["camera_center"]["Alt"], - solved["camera_center"]["Az"], - dt, - ) - # Calculate the roll at the target RA/Dec and compensate for the offset. - solved["camera_center"]["Roll"] = ( - calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - + solved["Roll_offset"] - ) - - solved["solve_time"] = time.time() - solved["solve_source"] = "IMU" - - # Push IMU updates only if newer than last push - # (Camera solves already pushed above at line 185) - if ( - solved["RA"] - and solved["solve_time"] > last_solve_time - and solved.get("solve_source") == "IMU" - ): - last_solve_time = time.time() - # Calculate constellation for IMU dead-reckoning position - solved["constellation"] = calc_utils.sf_utils.radec_to_constellation( - solved["RA"], solved["Dec"] - ) - - # Push IMU update - shared_state.set_solution(solved) - shared_state.set_solve_state(True) - except EOFError: - logger.error("Main no longer running for integrator") - - -def estimate_roll_offset(solved, dt): - """ - Estimate the roll offset due to misalignment of the camera sensor with - the mount/scope's coordinate system. The offset is calculated at the - center of the camera's FoV. - - To calculate the roll with offset: roll = calculated_roll + roll_offset - """ - # Calculate the expected roll at the camera center given the RA/Dec of - # of the camera center. - roll_camera_calculated = calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], solved["camera_center"]["Dec"], dt - ) - roll_offset = solved["camera_center"]["Roll"] - roll_camera_calculated - - return roll_offset diff --git a/python/PiFinder/main.py b/python/PiFinder/main.py index aa118538..d6db0427 100644 --- a/python/PiFinder/main.py +++ b/python/PiFinder/main.py @@ -1004,7 +1004,7 @@ def main( hardware_platform = "Fake" display_hardware = "pg_128" imu = importlib.import_module("PiFinder.imu_fake") - integrator = importlib.import_module("PiFinder.integrator_classic") + integrator = importlib.import_module("PiFinder.integrator") gps_monitor = importlib.import_module("PiFinder.gps_fake") else: hardware_platform = "Pi" @@ -1012,12 +1012,8 @@ def main( from rpi_hardware_pwm import HardwarePWM cfg = config.Config() - if cfg.get_option("imu_integrator") == "quaternion": - imu = importlib.import_module("PiFinder.imu_pi") - integrator = importlib.import_module("PiFinder.integrator") - else: - imu = importlib.import_module("PiFinder.imu_pi_classic") - integrator = importlib.import_module("PiFinder.integrator_classic") + imu = importlib.import_module("PiFinder.imu_pi") + integrator = importlib.import_module("PiFinder.integrator") # verify and sync GPSD baud rate try: diff --git a/python/PiFinder/solver.py b/python/PiFinder/solver.py index 2fc6cee3..622b916d 100644 --- a/python/PiFinder/solver.py +++ b/python/PiFinder/solver.py @@ -457,12 +457,8 @@ def solver( if last_image_metadata.get("imu"): solved["imu_quat"] = last_image_metadata["imu"]["quat"] - solved["imu_pos"] = last_image_metadata["imu"].get( - "pos" - ) else: solved["imu_quat"] = None - solved["imu_pos"] = None solved["solve_time"] = time.time() solved["cam_solve_time"] = solved["solve_time"] @@ -562,9 +558,7 @@ def get_initialized_solved_dict() -> dict: "Dec": None, "Roll": None, }, - "imu_pos": None, # IMU euler angles (classic integrator) "imu_quat": None, # IMU quaternion as numpy quaternion (scalar-first) - "Roll_offset": 0, # Roll offset for classic integrator # Alt, Az [deg] of scope: "Alt": None, "Az": None, diff --git a/python/PiFinder/ui/menu_structure.py b/python/PiFinder/ui/menu_structure.py index 6a4f15bd..9a0cabc0 100644 --- a/python/PiFinder/ui/menu_structure.py +++ b/python/PiFinder/ui/menu_structure.py @@ -771,7 +771,7 @@ def _(key: str) -> Any: "select": "single", "label": "chart_settings", "items": [ - { + { "name": _("Coordinate Sys."), "class": UITextMenu, "select": "single", @@ -1161,17 +1161,6 @@ def _(key: str) -> Any: "select": "Single", "items": [ {"name": "SQM", "class": UISQM}, - { - "name": _("Integrator"), - "class": UITextMenu, - "select": "single", - "config_option": "imu_integrator", - "post_callback": callbacks.restart_pifinder, - "items": [ - {"name": _("Classic"), "value": "classic"}, - {"name": _("Quaternion"), "value": "quaternion"}, - ], - }, { "name": _("AE Algo"), "class": UITextMenu,