From 8bb1b2f9e4005883a959185e48b8d8de88472c1f Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 17 Feb 2018 18:21:57 +0000 Subject: [PATCH 1/4] Remove unused attribute --- robot/board.py | 1 - 1 file changed, 1 deletion(-) diff --git a/robot/board.py b/robot/board.py index 8ee2b59..72b8950 100644 --- a/robot/board.py +++ b/robot/board.py @@ -12,7 +12,6 @@ class Board: """Base class for connections to ``robotd`` board sockets.""" SEND_TIMEOUT_SECS = 6 - RECV_BUFFER_BYTES = 2048 def __init__(self, socket_path: Union[Path, str]) -> None: self.socket_path = Path(socket_path) From b091052f53ed45454265c28524593be8696c68a8 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 17 Feb 2018 18:22:33 +0000 Subject: [PATCH 2/4] This timeout applies in both directions --- robot/board.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robot/board.py b/robot/board.py index 72b8950..86544c6 100644 --- a/robot/board.py +++ b/robot/board.py @@ -11,7 +11,7 @@ class Board: """Base class for connections to ``robotd`` board sockets.""" - SEND_TIMEOUT_SECS = 6 + CONNECTION_TIMEOUT_SECS = 6 def __init__(self, socket_path: Union[Path, str]) -> None: self.socket_path = Path(socket_path) @@ -40,7 +40,7 @@ def _connect(self): :param socket_path: Path for the unix socket """ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.settimeout(self.SEND_TIMEOUT_SECS) + self.socket.settimeout(self.CONNECTION_TIMEOUT_SECS) try: self.socket.connect(str(self.socket_path)) From 0be9c46835d1cfd209664ae2f3cb28e32150015a Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 17 Feb 2018 18:26:06 +0000 Subject: [PATCH 3/4] Drop redundant parameter The 'should_retry' parameter always ends up being 'True', so there's not a lot of point in having it. Iff we find we need it, we can add it back later. For now removing it greatly simplifies reasoning about the system. --- robot/board.py | 25 +++++++++---------------- robot/camera.py | 2 +- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/robot/board.py b/robot/board.py index 86544c6..9e145ca 100644 --- a/robot/board.py +++ b/robot/board.py @@ -98,11 +98,10 @@ def _socket_with_single_retry(self, handler): raise original_exception - def _send(self, message, should_retry=True): + def _send(self, message): """ Send a message to robotd. - :param retry: used internally :param message: message to send """ @@ -111,10 +110,7 @@ def _send(self, message, should_retry=True): def sendall(): self.socket.sendall(data) - if should_retry: - return self._socket_with_single_retry(sendall) - else: - return sendall() + return self._socket_with_single_retry(sendall) def _recv_from_socket(self, size): data = self.socket.recv(size) @@ -122,17 +118,14 @@ def _recv_from_socket(self, size): raise BrokenPipeError() return data - def _receive(self, should_retry=True): + def _receive(self): """ Receive a message from robotd. """ while b'\n' not in self.data: - if should_retry: - message = self._socket_with_single_retry( - lambda: self._recv_from_socket(4096), - ) - else: - message = self._recv_from_socket(4096) + message = self._socket_with_single_retry( + lambda: self._recv_from_socket(4096), + ) self.data += message @@ -141,12 +134,12 @@ def _receive(self, should_retry=True): return json.loads(line.decode('utf-8')) - def _send_and_receive(self, message, should_retry=True): + def _send_and_receive(self, message): """ Send a message to robotd and wait for a response. """ - self._send(message, should_retry) - return self._receive(should_retry) + self._send(message) + return self._receive() def close(self): """ diff --git a/robot/camera.py b/robot/camera.py index bcf3cc5..7b0e514 100644 --- a/robot/camera.py +++ b/robot/camera.py @@ -69,7 +69,7 @@ def see(self) -> ResultList: while True: try: - return self._see_to_results(self._receive(should_retry=True)) + return self._see_to_results(self._receive()) except socket.timeout: if time.time() > abort_after: raise From fd2c6aff12929ea87b3a8619fe6219bb0ea9cd03 Mon Sep 17 00:00:00 2001 From: Peter Law Date: Sat, 17 Feb 2018 19:01:19 +0000 Subject: [PATCH 4/4] Simplify robotd connections This copies over the Connection class from robotd (extending it with types) and uses it to simplify our connection handling. Our version has a notable difference that when it receives an empty message it raises a BrokenPipeError so that this failure is loud. This commit deliberately strips back the retrying support to a minimal single retry which only occurs on errors which are actually fixable by reconnecting. Specifically this means that we now _don't_ catch: - socket.timeout: this would need a very different retry approach than we currently use (we should retry using the same socket, rather than a new one) and isn't likely to be resolvable given that we have a long timeout anyway - OSError: the only error of this class we care about (as far as I'm aware) is FileNotFoundError, which wouldn't be thrown by either the `send` or `receive` methods anyway (only when connecting). --- robot/board.py | 100 ++++++++++++-------------------------------- robot/connection.py | 42 +++++++++++++++++++ 2 files changed, 69 insertions(+), 73 deletions(-) create mode 100644 robot/connection.py diff --git a/robot/board.py b/robot/board.py index 9e145ca..0b81e3d 100644 --- a/robot/board.py +++ b/robot/board.py @@ -1,10 +1,10 @@ -import json import logging import socket -import time from pathlib import Path from typing import Mapping, TypeVar, Union +from .connection import Connection, Message + LOGGER = logging.getLogger(__name__) @@ -15,10 +15,9 @@ class Board: def __init__(self, socket_path: Union[Path, str]) -> None: self.socket_path = Path(socket_path) - self.socket = None - self.data = b'' - self._connect() + self.connection = Connection(self._get_socket()) + self._greeting_response(self.connection.receive()) @property def serial(self): @@ -33,23 +32,28 @@ def _greeting_response(self, data): """ pass - def _connect(self): + def _get_socket(self): """ Connect or reconnect to a socket. :param socket_path: Path for the unix socket """ - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.settimeout(self.CONNECTION_TIMEOUT_SECS) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(self.CONNECTION_TIMEOUT_SECS) try: - self.socket.connect(str(self.socket_path)) + sock.connect(str(self.socket_path)) except ConnectionRefusedError as e: LOGGER.exception("Error connecting to: '%s'", self.socket_path) raise - greeting = self._receive() - self._greeting_response(greeting) + return sock + + def _reconnect(self) -> None: + self.connection.close() + + self.connection = Connection(self._get_socket()) + self._greeting_response(self.connection.receive()) def _get_lc_error(self) -> str: """ @@ -62,77 +66,27 @@ def _get_lc_error(self) -> str: path=self.socket_path, ) - def _socket_with_single_retry(self, handler): - retryable_errors = ( - socket.timeout, - BrokenPipeError, - OSError, - ConnectionResetError, - ) - - backoffs = [ - 0.1, - 0.5, - 1.0, - 2.0, - 3.0, - ] - - try: - return handler() - except retryable_errors as e: - original_exception = e - - for backoff in backoffs: - time.sleep(backoff) - - try: - self._connect() - except (ConnectionRefusedError, FileNotFoundError): - continue - - try: - return handler() - except retryable_errors: - pass - - raise original_exception - - def _send(self, message): + def _send(self, message: Message) -> None: """ Send a message to robotd. :param message: message to send """ - - data = (json.dumps(message) + '\n').encode('utf-8') - - def sendall(): - self.socket.sendall(data) - - return self._socket_with_single_retry(sendall) - - def _recv_from_socket(self, size): - data = self.socket.recv(size) - if data == b'': - raise BrokenPipeError() - return data + try: + self.connection.send(message) + except (BrokenPipeError, ConnectionError): + self._reconnect() + self.connection.send(message) def _receive(self): """ Receive a message from robotd. """ - while b'\n' not in self.data: - message = self._socket_with_single_retry( - lambda: self._recv_from_socket(4096), - ) - - self.data += message - - line = self.data.split(b'\n', 1)[0] - self.data = self.data[len(line) + 1:] - - return json.loads(line.decode('utf-8')) + try: + return self.connection.receive() + except (BrokenPipeError, ConnectionError): + self._reconnect() + return self.connection.receive() def _send_and_receive(self, message): """ @@ -145,7 +99,7 @@ def close(self): """ Close the the connection to the underlying robotd board. """ - self.socket.detach() + self.connection.close() def __str__(self): return "{} - {}".format(self.__name__, self.serial) diff --git a/robot/connection.py b/robot/connection.py new file mode 100644 index 0000000..ec468c7 --- /dev/null +++ b/robot/connection.py @@ -0,0 +1,42 @@ +import json +import socket +from typing import Any, Dict + +Message = Dict[str, Any] + + +class Connection: + """ + A connection to a device. + + This wraps a ``socket.socket`` providing encoding and decoding so that + consumers of this class can send and receive JSON-compatible typed data + rather than needing to worry about lower-level details. + """ + + def __init__(self, socket: socket.socket) -> None: + """Wrap the given socket.""" + self.socket = socket + self.data = b'' + + def close(self) -> None: + """Close the connection.""" + self.socket.close() + + def send(self, message: Message) -> None: + """Send the given JSON-compatible message over the connection.""" + line = json.dumps(message).encode('utf-8') + b'\n' + self.socket.sendall(line) + + def receive(self) -> Message: + """Receive a single message from the connection.""" + while b'\n' not in self.data: + message = self.socket.recv(4096) + if message == b'': + raise BrokenPipeError() + + self.data += message + line = self.data.split(b'\n', 1)[0] + self.data = self.data[len(line) + 1:] + + return json.loads(line.decode('utf-8'))