diff --git a/robot/board.py b/robot/board.py index 8ee2b59..0b81e3d 100644 --- a/robot/board.py +++ b/robot/board.py @@ -1,25 +1,23 @@ -import json import logging import socket -import time from pathlib import Path from typing import Mapping, TypeVar, Union +from .connection import Connection, Message + LOGGER = logging.getLogger(__name__) class Board: """Base class for connections to ``robotd`` board sockets.""" - SEND_TIMEOUT_SECS = 6 - RECV_BUFFER_BYTES = 2048 + CONNECTION_TIMEOUT_SECS = 6 def __init__(self, socket_path: Union[Path, str]) -> None: self.socket_path = Path(socket_path) - self.socket = None - self.data = b'' - self._connect() + self.connection = Connection(self._get_socket()) + self._greeting_response(self.connection.receive()) @property def serial(self): @@ -34,23 +32,28 @@ def _greeting_response(self, data): """ pass - def _connect(self): + def _get_socket(self): """ Connect or reconnect to a socket. :param socket_path: Path for the unix socket """ - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.settimeout(self.SEND_TIMEOUT_SECS) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(self.CONNECTION_TIMEOUT_SECS) try: - self.socket.connect(str(self.socket_path)) + sock.connect(str(self.socket_path)) except ConnectionRefusedError as e: LOGGER.exception("Error connecting to: '%s'", self.socket_path) raise - greeting = self._receive() - self._greeting_response(greeting) + return sock + + def _reconnect(self) -> None: + self.connection.close() + + self.connection = Connection(self._get_socket()) + self._greeting_response(self.connection.receive()) def _get_lc_error(self) -> str: """ @@ -63,97 +66,40 @@ def _get_lc_error(self) -> str: path=self.socket_path, ) - def _socket_with_single_retry(self, handler): - retryable_errors = ( - socket.timeout, - BrokenPipeError, - OSError, - ConnectionResetError, - ) - - backoffs = [ - 0.1, - 0.5, - 1.0, - 2.0, - 3.0, - ] - - try: - return handler() - except retryable_errors as e: - original_exception = e - - for backoff in backoffs: - time.sleep(backoff) - - try: - self._connect() - except (ConnectionRefusedError, FileNotFoundError): - continue - - try: - return handler() - except retryable_errors: - pass - - raise original_exception - - def _send(self, message, should_retry=True): + def _send(self, message: Message) -> None: """ Send a message to robotd. - :param retry: used internally :param message: message to send """ + try: + self.connection.send(message) + except (BrokenPipeError, ConnectionError): + self._reconnect() + self.connection.send(message) - data = (json.dumps(message) + '\n').encode('utf-8') - - def sendall(): - self.socket.sendall(data) - - if should_retry: - return self._socket_with_single_retry(sendall) - else: - return sendall() - - def _recv_from_socket(self, size): - data = self.socket.recv(size) - if data == b'': - raise BrokenPipeError() - return data - - def _receive(self, should_retry=True): + def _receive(self): """ Receive a message from robotd. """ - while b'\n' not in self.data: - if should_retry: - message = self._socket_with_single_retry( - lambda: self._recv_from_socket(4096), - ) - else: - message = self._recv_from_socket(4096) - - self.data += message - - line = self.data.split(b'\n', 1)[0] - self.data = self.data[len(line) + 1:] - - return json.loads(line.decode('utf-8')) + try: + return self.connection.receive() + except (BrokenPipeError, ConnectionError): + self._reconnect() + return self.connection.receive() - def _send_and_receive(self, message, should_retry=True): + def _send_and_receive(self, message): """ Send a message to robotd and wait for a response. """ - self._send(message, should_retry) - return self._receive(should_retry) + self._send(message) + return self._receive() def close(self): """ Close the the connection to the underlying robotd board. """ - self.socket.detach() + self.connection.close() def __str__(self): return "{} - {}".format(self.__name__, self.serial) diff --git a/robot/camera.py b/robot/camera.py index bcf3cc5..7b0e514 100644 --- a/robot/camera.py +++ b/robot/camera.py @@ -69,7 +69,7 @@ def see(self) -> ResultList: while True: try: - return self._see_to_results(self._receive(should_retry=True)) + return self._see_to_results(self._receive()) except socket.timeout: if time.time() > abort_after: raise diff --git a/robot/connection.py b/robot/connection.py new file mode 100644 index 0000000..ec468c7 --- /dev/null +++ b/robot/connection.py @@ -0,0 +1,42 @@ +import json +import socket +from typing import Any, Dict + +Message = Dict[str, Any] + + +class Connection: + """ + A connection to a device. + + This wraps a ``socket.socket`` providing encoding and decoding so that + consumers of this class can send and receive JSON-compatible typed data + rather than needing to worry about lower-level details. + """ + + def __init__(self, socket: socket.socket) -> None: + """Wrap the given socket.""" + self.socket = socket + self.data = b'' + + def close(self) -> None: + """Close the connection.""" + self.socket.close() + + def send(self, message: Message) -> None: + """Send the given JSON-compatible message over the connection.""" + line = json.dumps(message).encode('utf-8') + b'\n' + self.socket.sendall(line) + + def receive(self) -> Message: + """Receive a single message from the connection.""" + while b'\n' not in self.data: + message = self.socket.recv(4096) + if message == b'': + raise BrokenPipeError() + + self.data += message + line = self.data.split(b'\n', 1)[0] + self.data = self.data[len(line) + 1:] + + return json.loads(line.decode('utf-8'))