Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions roborock/devices/local_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
_LOGGER = logging.getLogger(__name__)
_PORT = 58867
_TIMEOUT = 5.0
_PING_INTERVAL = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use a timedelta here to be more explicit on the unit?

Suggested change
_PING_INTERVAL = 10
_PING_INTERVAL = datetime.timedelta(seconds=10)



@dataclass
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(self, host: str, local_key: str):
self._subscribers: CallbackList[RoborockMessage] = CallbackList(_LOGGER)
self._is_connected = False
self._local_protocol_version: LocalProtocolVersion | None = None
self._keep_alive_task: asyncio.Task[None] | None = None
self._update_encoder_decoder(
LocalChannelParams(local_key=local_key, connect_nonce=get_next_int(10000, 32767), ack_nonce=None)
)
Expand Down Expand Up @@ -132,6 +134,28 @@ async def _hello(self):

raise RoborockException("Failed to connect to device with any known protocol")

async def _ping(self) -> None:
ping_message = RoborockMessage(
protocol=RoborockMessageProtocol.PING_REQUEST, version=self.protocol_version.encode()
)
await self._send_message(
roborock_message=ping_message,
request_id=ping_message.seq,
response_protocol=RoborockMessageProtocol.PING_RESPONSE,
)

async def _keep_alive_loop(self) -> None:
while self._is_connected:
try:
await asyncio.sleep(_PING_INTERVAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming you accept the above suggestion

Suggested change
await asyncio.sleep(_PING_INTERVAL)
await asyncio.sleep(_PING_INTERVAL.total_seconds())

if self._is_connected:
await self._ping()
except asyncio.CancelledError:
break
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's catch RoborockException (expected) separate from uncaught exceptions so we can log them separately e.g. "ping failed" vs "Uncaught exception" implying a bug/shouldn't happen case we need to fix. Similar to _background_reconnect in v1 channel?

_LOGGER.debug("Keep-alive ping failed", exc_info=True)
# Retry next interval
Comment on lines +147 to +157
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new keep-alive functionality lacks test coverage. Consider adding tests to verify:

  1. That _keep_alive_task is created when connect() is called
  2. That the task is properly canceled when close() or _connection_lost() is called
  3. That the ping loop continues to execute periodically while connected
  4. That exceptions in the ping loop are handled gracefully without stopping the loop

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback


@property
def protocol_version(self) -> LocalProtocolVersion:
"""Return the negotiated local protocol version, or a sensible default."""
Expand Down Expand Up @@ -166,6 +190,7 @@ async def connect(self) -> None:
# Perform protocol negotiation
try:
await self._hello()
self._keep_alive_task = asyncio.create_task(self._keep_alive_loop())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this outside the scope of the try/catch. It won't throw RoborockException

except RoborockException:
# If protocol negotiation fails, clean up the connection state
self.close()
Expand All @@ -177,6 +202,9 @@ def _data_received(self, data: bytes) -> None:

def close(self) -> None:
"""Disconnect from the device."""
if self._keep_alive_task:
self._keep_alive_task.cancel()
self._keep_alive_task = None
if self._transport:
self._transport.close()
else:
Expand All @@ -187,6 +215,9 @@ def close(self) -> None:
def _connection_lost(self, exc: Exception | None) -> None:
"""Handle connection loss."""
_LOGGER.warning("Connection lost to %s", self._host, exc_info=exc)
if self._keep_alive_task:
self._keep_alive_task.cancel()
self._keep_alive_task = None
self._transport = None
self._is_connected = False

Expand Down
109 changes: 109 additions & 0 deletions tests/devices/test_local_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,112 @@ async def mock_do_hello(local_protocol_version: LocalProtocolVersion) -> LocalCh
assert channel._params is not None
assert channel._params.ack_nonce == 11111
assert channel._is_connected is True


async def test_keep_alive_task_created_on_connect(local_channel: LocalChannel, mock_loop: Mock) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid these tests of internal state like this generally, i'm not sure its worhtwhile to keep?

"""Test that _keep_alive_task is created when connect() is called."""
# Before connecting, task should be None
assert local_channel._keep_alive_task is None

await local_channel.connect()

# After connecting, task should be created and not done
assert local_channel._keep_alive_task is not None
assert isinstance(local_channel._keep_alive_task, asyncio.Task)
assert not local_channel._keep_alive_task.done()


async def test_keep_alive_task_canceled_on_close(local_channel: LocalChannel, mock_loop: Mock) -> None:
"""Test that the keep-alive task is properly canceled when close() is called."""
await local_channel.connect()

# Verify task exists
task = local_channel._keep_alive_task
assert task is not None
assert not task.done()

# Close the connection
local_channel.close()

# Give the task a moment to be cancelled
await asyncio.sleep(0.01)

# Task should be canceled and reset to None
assert task.cancelled() or task.done()
assert local_channel._keep_alive_task is None


async def test_keep_alive_task_canceled_on_connection_lost(local_channel: LocalChannel, mock_loop: Mock) -> None:
"""Test that the keep-alive task is properly canceled when _connection_lost() is called."""
await local_channel.connect()

# Verify task exists
task = local_channel._keep_alive_task
assert task is not None
assert not task.done()

# Simulate connection loss
local_channel._connection_lost(None)

# Give the task a moment to be cancelled
await asyncio.sleep(0.01)

# Task should be canceled and reset to None
assert task.cancelled() or task.done()
assert local_channel._keep_alive_task is None


async def test_keep_alive_ping_loop_executes_periodically(local_channel: LocalChannel, mock_loop: Mock) -> None:
"""Test that the ping loop continues to execute periodically while connected."""
await local_channel.connect()

# Verify the task is running and connected
assert local_channel._keep_alive_task is not None
assert not local_channel._keep_alive_task.done()
assert local_channel._is_connected


async def test_keep_alive_ping_exceptions_handled_gracefully(
local_channel: LocalChannel, mock_loop: Mock, caplog: pytest.LogCaptureFixture
) -> None:
"""Test that exceptions in the ping loop are handled gracefully without stopping the loop."""
from roborock.devices.local_channel import _PING_INTERVAL

# Set log level to capture DEBUG messages
caplog.set_level("DEBUG")

ping_call_count = 0

# Mock the _ping method to always fail
async def mock_ping() -> None:
nonlocal ping_call_count
ping_call_count += 1
raise Exception("Test ping failure")

# Also need to mock asyncio.sleep to avoid waiting the full interval
original_sleep = asyncio.sleep

async def mock_sleep(delay: float) -> None:
# Only sleep briefly for test speed when waiting for ping interval
if delay >= _PING_INTERVAL:
await original_sleep(0.01)
else:
await original_sleep(delay)

with patch("asyncio.sleep", side_effect=mock_sleep):
setattr(local_channel, "_ping", mock_ping)

await local_channel.connect()

# Wait for multiple ping attempts
await original_sleep(0.1)

# Verify the task is still running despite the exception
assert local_channel._keep_alive_task is not None
assert not local_channel._keep_alive_task.done()

# Verify ping was called at least once
assert ping_call_count >= 1

# Verify the exception was logged but didn't crash the loop
assert any("Keep-alive ping failed" in record.message for record in caplog.records)