"""
Celestron AUX Protocol Library
This module implements the binary communication protocol used by Celestron
telescope mounts via the AUX bus. It provides classes for command creation,
parsing, and asynchronous communication over Serial or TCP.
References:
- NexStar AUX Command Set documentation
- indi-celestronaux C++ implementation
"""
import struct
import time
import asyncio
import logging
import serial_asyncio
from enum import Enum
from typing import Optional, Union, List, Tuple, Any, TYPE_CHECKING
if TYPE_CHECKING:
from asyncio import StreamReader, StreamWriter
logger = logging.getLogger(__name__)
[docs]
class AUXCommands(Enum):
"""Enumeration of Celestron AUX bus commands."""
MC_GET_POSITION = 0x01
MC_GOTO_FAST = 0x02
MC_SET_POSITION = 0x04
MC_GET_MODEL = 0x05
MC_SET_POS_GUIDERATE = 0x06
MC_SET_NEG_GUIDERATE = 0x07
MC_LEVEL_START = 0x0B
MC_LEVEL_DONE = 0x12
MC_SLEW_DONE = 0x13
MC_GOTO_SLOW = 0x17
MC_SEEK_DONE = 0x18
MC_SEEK_INDEX = 0x19
MC_MOVE_POS = 0x24
MC_MOVE_NEG = 0x25
MC_AUX_GUIDE = 0x26
MC_AUX_GUIDE_ACTIVE = 0x27
MC_ENABLE_CORDWRAP = 0x38
MC_DISABLE_CORDWRAP = 0x39
MC_SET_CORDWRAP_POS = 0x3A
MC_POLL_CORDWRAP = 0x3B
MC_GET_CORDWRAP_POS = 0x3C
MC_SET_AUTOGUIDE_RATE = 0x46
MC_GET_AUTOGUIDE_RATE = 0x47
MC_GET_APPROACH = 0xFC
MC_SET_APPROACH = 0xFD
GET_VER = 0xFE
# Simulation Backdoor (NOT for use with real hardware)
SIM_GET_SKY_POSITION = 0xFF
GPS_GET_LAT = 0x01
GPS_GET_LONG = 0x02
GPS_SET_LAT = 0x31
GPS_SET_LONG = 0x32
GPS_SET_TIME = 0x34
GPS_GET_TIME = 0x33
GPS_SET_DATE = 0x3C
GPS_GET_DATE = 0x3B
GPS_TIME_VALID = 0x36
GPS_LINKED = 0x37
GPS_GET_SATS = 0x38
FOC_GET_HS_POSITIONS = 0x2C
# Power/Battery
PWR_GET_VOLTAGE = 0x01
PWR_GET_CURRENT = 0x02
PWR_GET_STATUS = 0x03
[docs]
class AUXTargets(Enum):
"""Enumeration of devices on the AUX bus."""
ANY = 0x00
MB = 0x01 # Main Board
HC = 0x04 # Hand Controller
HCP = 0x0D # Hand Controller Plus?
AZM = 0x10 # Azimuth / RA Motor
ALT = 0x11 # Altitude / Dec Motor
FOCUS = 0x12 # Focuser
APP = 0x20 # Software Application
GPS = 0xB0 # GPS Module
WiFi = 0xB5 # WiFi Module
BAT = 0xB6 # Battery
CHG = 0xB7 # Charger
LIGHT = 0xBF # Lighting (Evolution)
[docs]
class AUXCommand:
"""
Represents a single Celestron AUX bus command packet.
Attributes:
command (AUXCommands): The command to execute.
source (AUXTargets): The sender of the command.
destination (AUXTargets): The target device.
data (bytes): Optional payload.
length (int): Length of (source + destination + command + data).
"""
START_BYTE = 0x3B
MAX_CMD_LEN = 32
def __init__(
self,
command: AUXCommands,
source: AUXTargets,
destination: AUXTargets,
data: bytes = b"",
) -> None:
self.command = command
self.source = source
self.destination = destination
self.data = data
self.length = 3 + len(self.data)
[docs]
def fill_buf(self) -> bytes:
"""
Serializes the command into a byte buffer for transmission.
Returns:
bytes: The complete packet (START | LEN | SRC | DST | CMD | DATA... | CS).
"""
buf = bytearray()
buf.append(self.START_BYTE)
buf.append(self.length)
buf.append(self.source.value)
buf.append(self.destination.value)
buf.append(self.command.value)
buf.extend(self.data)
buf.append(self._calculate_checksum(buf[1:]))
return bytes(buf)
[docs]
@classmethod
def parse_buf(cls, buf: bytes) -> "AUXCommand":
"""
Parses a byte buffer into an AUXCommand object.
Args:
buf (bytes): Received bytes.
Returns:
AUXCommand: The parsed command object.
Raises:
ValueError: If the start byte is invalid or buffer is too short.
"""
if not buf or buf[0] != cls.START_BYTE:
raise ValueError(f"Invalid start byte or empty buffer: {buf.hex()}")
length = buf[1]
source = AUXTargets(buf[2])
destination = AUXTargets(buf[3])
command = AUXCommands(buf[4])
data = buf[5:-1]
checksum = buf[-1]
calculated_checksum = cls._calculate_checksum(buf[1:-1])
if calculated_checksum != checksum:
# We log but continue, as some mounts have flaky checksums
logger.error(
f"Checksum error: Expected {calculated_checksum:02X}, Got {checksum:02X} for buffer {buf.hex()}"
)
cmd = cls(command, source, destination, data)
cmd.length = length
return cmd
@staticmethod
def _calculate_checksum(data: Union[bytes, bytearray]) -> int:
"""
Calculates the AUX checksum (2's complement of sum).
Args:
data (bytes/bytearray): Data to checksum.
Returns:
int: 8-bit checksum value.
"""
cs = sum(data)
return ((~cs) + 1) & 0xFF
[docs]
def get_data_as_int(self) -> int:
"""
Converts command data bytes to a big-endian integer.
Returns:
int: The integer value of the payload.
"""
value = 0
if len(self.data) == 3:
value = (self.data[0] << 16) | (self.data[1] << 8) | self.data[2]
elif len(self.data) == 2:
value = (self.data[0] << 8) | self.data[1]
elif len(self.data) == 1:
value = self.data[0]
return value
[docs]
def set_data_from_int(self, value: int, num_bytes: int) -> None:
"""
Sets the command data payload from an integer.
Args:
value (int): The integer value.
num_bytes (int): Number of bytes to use (1, 2, or 3).
"""
if num_bytes == 1:
self.data = value.to_bytes(1, "big")
elif num_bytes == 2:
self.data = value.to_bytes(2, "big")
elif num_bytes == 3:
self.data = value.to_bytes(3, "big")
else:
raise ValueError("num_bytes must be 1, 2, or 3")
self.length = 3 + len(self.data)
def __repr__(self) -> str:
return f"AUXCommand(cmd={self.command.name}, src={self.source.name}, dst={self.destination.name}, data={self.data.hex()}, len={self.length})"
[docs]
def unpack_int3_steps(d: bytes) -> int:
"""
Unpacks 3 bytes into a 24-bit unsigned integer (encoder steps).
Args:
d (bytes): 3 bytes of data.
Returns:
int: Encoder steps.
"""
if len(d) != 3:
raise ValueError("Input bytes must be 3 bytes long for unpack_int3_steps")
return int.from_bytes(d, "big", signed=False)
[docs]
def pack_int3_steps(val: float) -> bytes:
"""
Packs a float or integer into 3 big-endian bytes.
"""
v = int(round(val))
if not 0 <= v < 2**24:
v = v % 16777216
return v.to_bytes(3, "big", signed=False)
# Constants for encoder calculations
STEPS_PER_REVOLUTION = 16777216
STEPS_PER_DEGREE = STEPS_PER_REVOLUTION / 360.0
STEPS_PER_ARCSEC = STEPS_PER_DEGREE / 3600.0
DEGREES_PER_STEP = 360.0 / STEPS_PER_REVOLUTION
[docs]
class AUXCommunicator:
"""
Handles asynchronous communication with the AUX bus.
Supports Serial (via pyserial-asyncio) and TCP (via socket:// prefix).
Implements echo-skipping for one-wire bus environments.
Attributes:
port (str): Device path (e.g. /dev/ttyUSB0) or URL (socket://host:port).
baudrate (int): Communication speed (default 19200).
timeout (float): Read timeout in seconds.
"""
def __init__(self, port: str, baudrate: int = 19200, timeout: float = 1.0) -> None:
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.reader: Optional["StreamReader"] = None
self.writer: Optional["StreamWriter"] = None
self.connected = False
self.lock = asyncio.Lock()
[docs]
async def connect(self) -> bool:
"""
Establishes connection to the AUX bus.
Returns:
bool: True if successful, False otherwise.
"""
try:
if self.port.startswith("socket://"):
host, port = self.port[9:].split(":")
self.reader, self.writer = await asyncio.open_connection(
host, int(port)
)
else:
self.reader, self.writer = await serial_asyncio.open_serial_connection(
url=self.port, baudrate=self.baudrate
)
self.connected = True
logger.info(
f"Communicator: Connected to {self.port} at {self.baudrate} baud."
)
return True
except Exception as e:
logger.error(f"Communicator: Error connecting: {e}")
self.connected = False
return False
[docs]
async def disconnect(self) -> None:
"""Closes the connection."""
if self.writer and self.connected:
self.writer.close()
await self.writer.wait_closed()
self.connected = False
logger.info(f"Communicator: Disconnected from {self.port}")
[docs]
async def send_command(self, command: AUXCommand) -> Optional[AUXCommand]:
"""
Sends an AUX command and waits for a response.
Implements echo skipping: if the received packet matches the sent one
(common on AUX bus), it is ignored, and the next packet is read.
Args:
command (AUXCommand): Command to send.
Returns:
AUXCommand: The response packet, or None on failure/timeout.
"""
if not self.connected or not self.writer or not self.reader:
return None
async with self.lock:
tx_buf = command.fill_buf()
try:
self.writer.write(tx_buf)
await self.writer.drain()
while True:
# 1. Wait for Start Byte
start_byte = await asyncio.wait_for(
self.reader.readexactly(1), timeout=self.timeout
)
if start_byte[0] != AUXCommand.START_BYTE:
continue
# 2. Read Length
length_byte = await asyncio.wait_for(
self.reader.readexactly(1), timeout=self.timeout
)
response_length = length_byte[0]
# 3. Read payload + Checksum
remaining_bytes = await asyncio.wait_for(
self.reader.readexactly(response_length + 1),
timeout=self.timeout,
)
rx_buf = start_byte + length_byte + remaining_bytes
resp = AUXCommand.parse_buf(rx_buf)
# 4. Skip Echo
if (
resp.source == command.source
and resp.destination == command.destination
and resp.command == command.command
):
continue
return resp
except Exception as e:
logger.error(
f"Communicator: Error in send_command: {type(e).__name__}: {e}"
)
return None