Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 110 additions & 7 deletions python/PiFinder/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
- Refactor into class PointingTracker

"""
from __future__ import annotations # To support | in typehints (remove this for Python 3.10+)

from __future__ import (
annotations,
) # To support | in typehints (remove this for Python 3.10+)

import queue
import time
import copy
import logging
import numpy as np
import quaternion # numpy-quaternion
import datetime

from PiFinder import config
from PiFinder import state_utils
Expand Down Expand Up @@ -62,7 +66,9 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
last_solve_time = time.time()

while True:
pointing_updated = False # Flag to track if pointing was updated in this loop
pointing_updated = (
False # Flag to track if pointing was updated in this loop
)
state_utils.sleep_for_framerate(shared_state)

# Check for new camera solve in queue
Expand Down Expand Up @@ -108,9 +114,12 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
update_plate_solve_and_imu(imu_dead_reckoning, solved)
pointing_updated = True
else:
# TODO: In this case, it should run update_imu()
# Failed solve - clear constellation
solved["solve_source"] = "CAM_FAILED"
solved["constellation"] = "" # NOTE: This gets over-written by IMU dead-reckoning
solved["constellation"] = (
"" # NOTE: This gets over-written by IMU dead-reckoning
)
# Push failed solved immediately
# This ensures auto-exposure sees Matches=0 for failed solves
shared_state.set_solution(solved)
Expand All @@ -121,18 +130,22 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa
# the last plate solved coordinates.
imu = shared_state.imu()
if imu:
# last_solve_time = time.time()
update_imu(imu_dead_reckoning, solved, last_image_solve, imu)
pointing_updated = True

# Update Alt, Az only if newer than last push
if pointing_updated and solved["solve_time"] > last_solve_time:
solved["constellation"] = get_constellation(solved["RA"], solved["Dec"])

# TODO: Altaz doesn't seem to be required for catalogs when in
# TODO: Altaz doesn't seem to be required for catalogs when in
# EQ mode? Could be disabled in future when in EQ mode?
solved["Alt"], solved["Az"] = get_alt_az(solved["RA"], solved["Dec"],
shared_state.location(),
shared_state.datetime())
solved["Alt"], solved["Az"] = get_alt_az(
solved["RA"],
solved["Dec"],
shared_state.location(),
shared_state.datetime(),
)

if (solved["RA"] is not None) and (solved["Dec"] is not None):
# Push new solved to shared state
Expand All @@ -146,6 +159,7 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa

# ======== Wrapper and helper functions ===============================


def update_plate_solve_and_imu(imu_dead_reckoning: ImuDeadReckoning, solved: dict):
"""
Wrapper for ImuDeadReckoning.update_plate_solve_and_imu() to
Expand Down Expand Up @@ -243,6 +257,38 @@ def update_imu(
)


def update_solved_coords(solved, location, dt, mount_type):
"""
Based on RA/Dec, update the following in dictionary 'solved':
solved["Alt"], solved["Az"], solved["Roll"], solved["constellation"]
"""
if solved["RA"] is None or solved["Dec"] is None:
solved["constellation"] = None
solved["Alt"], solved["Az"] = None, None
solved["Roll"] = None
return

# Calculate constellation for current position
solved["constellation"] = calc_utils.sf_utils.radec_to_constellation(
solved["RA"], solved["Dec"]
)

if location and dt:
# Location needs to be set for Alt/Az and roll calculations
calc_utils.sf_utils.set_location(location.lat, location.lon, location.altitude)
# Set Alt/Az
solved["Alt"], solved["Az"] = calc_utils.sf_utils.radec_to_altaz(
solved["RA"], solved["Dec"], dt
)
# Set the roll so that the chart is displayed appropriately for the mount type
solved["Roll"] = get_roll_by_mount_type(
solved["RA"], solved["Dec"], location, dt, mount_type
)
else:
solved["Alt"], solved["Az"] = None, None
solved["Roll"] = None


def get_constellation(RA_deg, Dec_deg) -> str:
"""
Get constellation name from the current RA/Dec position.
Expand Down Expand Up @@ -285,3 +331,60 @@ def set_cam2scope_alignment(imu_dead_reckoning: ImuDeadReckoning, solved: dict):

# Set alignment in imu_dead_reckoning
imu_dead_reckoning.set_cam2scope_alignment(solved_cam, solved_scope)


def get_roll_by_mount_type(
ra_deg: float, # Right Ascension of the target in degrees
dec_deg: float, # Declination of the target in degrees
location, # astropy EarthLocation object or None
dt: datetime.datetime, # datetime.datetime object or None
mount_type: str, # "Alt/Az" or "EQ"
) -> float:
"""
Returns the roll (in degrees) depending on the mount type so that the chart
is displayed appropriately for the mount type. The RA and Dec of the target
should be provided (in degrees).

* Alt/Az mount: Display the chart in the horizontal coordinate so that up
in the chart points to the Zenith.
* EQ mount: Display the chart in the equatorial coordinate system with the
NCP up so roll = 0.

Assumes that location has already been set in calc_utils.sf_utils.
"""
if mount_type == "Alt/Az":
# Altaz mounts: Display chart in horizontal coordinates
if location and dt:
"""
# We have location and time/date (and assume that location has been set)
# Roll at the target RA/Dec in the horizontal frame
roll_deg = calc_utils.sf_utils.radec_to_roll(ra_deg, dec_deg, dt)

# HACK:
# The IMU direction flips at a certaint point. Could due to a
# an issue in the formula in calc_utils.sf_utils.hadec_to_roll()
# This is a temperary hack for testing.
ha_deg = calc_utils.sf_utils.ra_to_ha(ra_deg, dt)
roll_deg = (
roll_deg - np.sign(ha_deg) * 180
) # In essence, gives: roll_deg = -pa_deg
# End of HACK
"""
# chart.py uses roll to rotate the chart around the target center
# by roll in anti-clockwise direction. Use -parallactic_angle
roll_deg = -calc_utils.sf_utils.radec_to_pa(ra_deg, dec_deg, dt)
else:
# No position or time/date available. Default to display in equatorial coordinate
roll_deg = 0.0 # NCP up
elif mount_type == "EQ":
# EQ-mounts: Display chart in equatorial coordinates
roll_deg = 0.0 # NCP up
# If location is available, adjust roll for hemisphere:
if location:
if location.lat < 0.0:
roll_deg = 180.0 # SCP up (for southern hemisphere)
else:
logger.error(f"Unknown mount type: {mount_type}. Cannot set roll.")
roll_deg = 0.0 # NCP up

return roll_deg
Loading