From 6d62aef680ce2a0aa16a453cc6949e483e8e6737 Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Sat, 23 Aug 2025 13:12:23 +0200 Subject: [PATCH] close virtual channels --- test/back2back_test.py | 14 +-- test/conftest.py | 24 ++++ test/notifier_test.py | 36 +++--- test/simplecyclic_test.py | 224 +++++++++++++++++++------------------- test/test_bus.py | 8 +- test/zero_dlc_test.py | 45 ++++---- 6 files changed, 187 insertions(+), 164 deletions(-) create mode 100644 test/conftest.py diff --git a/test/back2back_test.py b/test/back2back_test.py index b52bae530..738c9d16b 100644 --- a/test/back2back_test.py +++ b/test/back2back_test.py @@ -164,37 +164,33 @@ def test_message_is_rx(self): ) def test_message_is_rx_receive_own_messages(self): """The same as `test_message_direction` but testing with `receive_own_messages=True`.""" - bus3 = can.Bus( + with can.Bus( channel=self.CHANNEL_2, interface=self.INTERFACE_2, bitrate=self.BITRATE, fd=TEST_CAN_FD, single_handle=True, receive_own_messages=True, - ) - try: + ) as bus3: msg = can.Message( is_extended_id=False, arbitration_id=0x300, data=[2, 1, 3], is_rx=False ) bus3.send(msg) self_recv_msg_bus3 = bus3.recv(self.TIMEOUT) self.assertTrue(self_recv_msg_bus3.is_rx) - finally: - bus3.shutdown() def test_unique_message_instances(self): """Verify that we have a different instances of message for each bus even with `receive_own_messages=True`. """ - bus3 = can.Bus( + with can.Bus( channel=self.CHANNEL_2, interface=self.INTERFACE_2, bitrate=self.BITRATE, fd=TEST_CAN_FD, single_handle=True, receive_own_messages=True, - ) - try: + ) as bus3: msg = can.Message( is_extended_id=False, arbitration_id=0x300, data=[2, 1, 3] ) @@ -209,8 +205,6 @@ def test_unique_message_instances(self): recv_msg_bus1.data[0] = 4 self.assertNotEqual(recv_msg_bus1.data, recv_msg_bus2.data) self.assertEqual(recv_msg_bus2.data, self_recv_msg_bus3.data) - finally: - bus3.shutdown() def test_fd_message(self): msg = can.Message( diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 000000000..c54238be1 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,24 @@ +import pytest + +from can.interfaces import virtual + + +@pytest.fixture(autouse=True) +def check_unclosed_virtual_channel(): + """ + Pytest fixture for detecting leaked virtual CAN channels. + + - The fixture yields control to the test. + - After the test completes, it acquires `virtual.channels_lock` and asserts + that `virtual.channels` is empty. + - If a test leaves behind any unclosed virtual CAN channels, the assertion + will fail, surfacing resource leaks early. + + This helps maintain test isolation and prevents subtle bugs caused by + leftover state between tests. + """ + + yield + + with virtual.channels_lock: + assert len(virtual.channels) == 0 diff --git a/test/notifier_test.py b/test/notifier_test.py index c21d51f04..d8512a00b 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -20,23 +20,25 @@ def test_single_bus(self): self.assertTrue(notifier.stopped) def test_multiple_bus(self): - with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1: - with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2: - reader = can.BufferedReader() - notifier = can.Notifier([bus1, bus2], [reader], 0.1) - self.assertFalse(notifier.stopped) - msg = can.Message() - bus1.send(msg) - time.sleep(0.1) - bus2.send(msg) - recv_msg = reader.get_message(1) - self.assertIsNotNone(recv_msg) - self.assertEqual(recv_msg.channel, 0) - recv_msg = reader.get_message(1) - self.assertIsNotNone(recv_msg) - self.assertEqual(recv_msg.channel, 1) - notifier.stop() - self.assertTrue(notifier.stopped) + with ( + can.Bus(0, interface="virtual", receive_own_messages=True) as bus1, + can.Bus(1, interface="virtual", receive_own_messages=True) as bus2, + ): + reader = can.BufferedReader() + notifier = can.Notifier([bus1, bus2], [reader], 0.1) + self.assertFalse(notifier.stopped) + msg = can.Message() + bus1.send(msg) + time.sleep(0.1) + bus2.send(msg) + recv_msg = reader.get_message(1) + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.channel, 0) + recv_msg = reader.get_message(1) + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.channel, 1) + notifier.stop() + self.assertTrue(notifier.stopped) def test_context_manager(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: diff --git a/test/simplecyclic_test.py b/test/simplecyclic_test.py index c4c1a2340..91bbf3bbe 100644 --- a/test/simplecyclic_test.py +++ b/test/simplecyclic_test.py @@ -36,123 +36,120 @@ def test_cycle_time(self): is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] ) - with can.interface.Bus(interface="virtual") as bus1: - with can.interface.Bus(interface="virtual") as bus2: - # disabling the garbage collector makes the time readings more reliable - gc.disable() - - task = bus1.send_periodic(msg, 0.01, 1) - self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) + with ( + can.interface.Bus(interface="virtual") as bus1, + can.interface.Bus(interface="virtual") as bus2, + ): + # disabling the garbage collector makes the time readings more reliable + gc.disable() + + task = bus1.send_periodic(msg, 0.01, 1) + self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) - sleep(2) - size = bus2.queue.qsize() - # About 100 messages should have been transmitted - self.assertTrue( - 80 <= size <= 120, - "100 +/- 20 messages should have been transmitted. But queue contained {}".format( - size - ), - ) - last_msg = bus2.recv() - next_last_msg = bus2.recv() + sleep(2) + size = bus2.queue.qsize() + # About 100 messages should have been transmitted + self.assertTrue( + 80 <= size <= 120, + "100 +/- 20 messages should have been transmitted. But queue contained {}".format( + size + ), + ) + last_msg = bus2.recv() + next_last_msg = bus2.recv() - # we need to reenable the garbage collector again - gc.enable() + # we need to reenable the garbage collector again + gc.enable() - # Check consecutive messages are spaced properly in time and have - # the same id/data - self.assertMessageEqual(last_msg, next_last_msg) + # Check consecutive messages are spaced properly in time and have + # the same id/data + self.assertMessageEqual(last_msg, next_last_msg) - # Check the message id/data sent is the same as message received - # Set timestamp and channel to match recv'd because we don't care - # and they are not initialized by the can.Message constructor. - msg.timestamp = last_msg.timestamp - msg.channel = last_msg.channel - self.assertMessageEqual(msg, last_msg) + # Check the message id/data sent is the same as message received + # Set timestamp and channel to match recv'd because we don't care + # and they are not initialized by the can.Message constructor. + msg.timestamp = last_msg.timestamp + msg.channel = last_msg.channel + self.assertMessageEqual(msg, last_msg) def test_removing_bus_tasks(self): - bus = can.interface.Bus(interface="virtual") - tasks = [] - for task_i in range(10): - msg = can.Message( - is_extended_id=False, - arbitration_id=0x123, - data=[0, 1, 2, 3, 4, 5, 6, 7], - ) - msg.arbitration_id = task_i - task = bus.send_periodic(msg, 0.1, 1) - tasks.append(task) - self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) + with can.interface.Bus(interface="virtual") as bus: + tasks = [] + for task_i in range(10): + msg = can.Message( + is_extended_id=False, + arbitration_id=0x123, + data=[0, 1, 2, 3, 4, 5, 6, 7], + ) + msg.arbitration_id = task_i + task = bus.send_periodic(msg, 0.1, 1) + tasks.append(task) + self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) - assert len(bus._periodic_tasks) == 10 + assert len(bus._periodic_tasks) == 10 - for task in tasks: - # Note calling task.stop will remove the task from the Bus's internal task management list - task.stop() + for task in tasks: + # Note calling task.stop will remove the task from the Bus's internal task management list + task.stop() - self.join_threads([task.thread for task in tasks], 5.0) + self.join_threads([task.thread for task in tasks], 5.0) - assert len(bus._periodic_tasks) == 0 - bus.shutdown() + assert len(bus._periodic_tasks) == 0 def test_managed_tasks(self): - bus = can.interface.Bus(interface="virtual", receive_own_messages=True) - tasks = [] - for task_i in range(3): - msg = can.Message( - is_extended_id=False, - arbitration_id=0x123, - data=[0, 1, 2, 3, 4, 5, 6, 7], - ) - msg.arbitration_id = task_i - task = bus.send_periodic(msg, 0.1, 10, store_task=False) - tasks.append(task) - self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) - - assert len(bus._periodic_tasks) == 0 + with can.interface.Bus(interface="virtual", receive_own_messages=True) as bus: + tasks = [] + for task_i in range(3): + msg = can.Message( + is_extended_id=False, + arbitration_id=0x123, + data=[0, 1, 2, 3, 4, 5, 6, 7], + ) + msg.arbitration_id = task_i + task = bus.send_periodic(msg, 0.1, 10, store_task=False) + tasks.append(task) + self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC) - # Self managed tasks should still be sending messages - for _ in range(50): - received_msg = bus.recv(timeout=5.0) - assert received_msg is not None - assert received_msg.arbitration_id in {0, 1, 2} + assert len(bus._periodic_tasks) == 0 - for task in tasks: - task.stop() + # Self managed tasks should still be sending messages + for _ in range(50): + received_msg = bus.recv(timeout=5.0) + assert received_msg is not None + assert received_msg.arbitration_id in {0, 1, 2} - self.join_threads([task.thread for task in tasks], 5.0) + for task in tasks: + task.stop() - bus.shutdown() + self.join_threads([task.thread for task in tasks], 5.0) def test_stopping_perodic_tasks(self): - bus = can.interface.Bus(interface="virtual") - tasks = [] - for task_i in range(10): - msg = can.Message( - is_extended_id=False, - arbitration_id=0x123, - data=[0, 1, 2, 3, 4, 5, 6, 7], - ) - msg.arbitration_id = task_i - task = bus.send_periodic(msg, 0.1, 1) - tasks.append(task) - - assert len(bus._periodic_tasks) == 10 - # stop half the tasks using the task object - for task in tasks[::2]: - task.stop() + with can.interface.Bus(interface="virtual") as bus: + tasks = [] + for task_i in range(10): + msg = can.Message( + is_extended_id=False, + arbitration_id=0x123, + data=[0, 1, 2, 3, 4, 5, 6, 7], + ) + msg.arbitration_id = task_i + task = bus.send_periodic(msg, 0.1, 1) + tasks.append(task) - assert len(bus._periodic_tasks) == 5 + assert len(bus._periodic_tasks) == 10 + # stop half the tasks using the task object + for task in tasks[::2]: + task.stop() - # stop the other half using the bus api - bus.stop_all_periodic_tasks(remove_tasks=False) - self.join_threads([task.thread for task in tasks], 5.0) + assert len(bus._periodic_tasks) == 5 - # Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should - # still be associated with the bus (e.g. for restarting) - assert len(bus._periodic_tasks) == 5 + # stop the other half using the bus api + bus.stop_all_periodic_tasks(remove_tasks=False) + self.join_threads([task.thread for task in tasks], 5.0) - bus.shutdown() + # Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should + # still be associated with the bus (e.g. for restarting) + assert len(bus._periodic_tasks) == 5 def test_restart_perodic_tasks(self): period = 0.01 @@ -214,25 +211,26 @@ def _read_all_messages(_bus: "can.interfaces.virtual.VirtualBus") -> None: @unittest.skipIf(IS_CI, "fails randomly when run on CI server") def test_thread_based_cyclic_send_task(self): - bus = can.ThreadSafeBus(interface="virtual") - msg = can.Message( - is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7] - ) + with can.ThreadSafeBus(interface="virtual") as bus: + msg = can.Message( + is_extended_id=False, + arbitration_id=0x123, + data=[0, 1, 2, 3, 4, 5, 6, 7], + ) - # good case, bus is up - on_error_mock = MagicMock(return_value=False) - task = can.broadcastmanager.ThreadBasedCyclicSendTask( - bus=bus, - lock=bus._lock_send_periodic, - messages=msg, - period=0.1, - duration=3, - on_error=on_error_mock, - ) - sleep(1) - on_error_mock.assert_not_called() - task.stop() - bus.shutdown() + # good case, bus is up + on_error_mock = MagicMock(return_value=False) + task = can.broadcastmanager.ThreadBasedCyclicSendTask( + bus=bus, + lock=bus._lock_send_periodic, + messages=msg, + period=0.1, + duration=3, + on_error=on_error_mock, + ) + sleep(1) + on_error_mock.assert_not_called() + task.stop() # bus has been shut down on_error_mock = MagicMock(return_value=False) diff --git a/test/test_bus.py b/test/test_bus.py index 24421b2fd..6a09a6deb 100644 --- a/test/test_bus.py +++ b/test/test_bus.py @@ -8,11 +8,11 @@ def test_bus_ignore_config(): with patch.object( target=can.util, attribute="load_config", side_effect=can.util.load_config ): - _ = can.Bus(interface="virtual", ignore_config=True) - assert not can.util.load_config.called + with can.Bus(interface="virtual", ignore_config=True): + assert not can.util.load_config.called - _ = can.Bus(interface="virtual") - assert can.util.load_config.called + with can.Bus(interface="virtual"): + assert can.util.load_config.called @patch.object(can.bus.BusABC, "shutdown") diff --git a/test/zero_dlc_test.py b/test/zero_dlc_test.py index d6693e294..e8ae8c293 100644 --- a/test/zero_dlc_test.py +++ b/test/zero_dlc_test.py @@ -12,35 +12,40 @@ class ZeroDLCTest(unittest.TestCase): def test_recv_non_zero_dlc(self): - bus_send = can.interface.Bus(interface="virtual") - bus_recv = can.interface.Bus(interface="virtual") - data = [0, 1, 2, 3, 4, 5, 6, 7] - msg_send = can.Message(is_extended_id=False, arbitration_id=0x100, data=data) + with ( + can.interface.Bus(interface="virtual") as bus_send, + can.interface.Bus(interface="virtual") as bus_recv, + ): + data = [0, 1, 2, 3, 4, 5, 6, 7] + msg_send = can.Message( + is_extended_id=False, arbitration_id=0x100, data=data + ) - bus_send.send(msg_send) - msg_recv = bus_recv.recv() + bus_send.send(msg_send) + msg_recv = bus_recv.recv() - # Receiving a frame with data should evaluate msg_recv to True - self.assertTrue(msg_recv) + # Receiving a frame with data should evaluate msg_recv to True + self.assertTrue(msg_recv) def test_recv_none(self): - bus_recv = can.interface.Bus(interface="virtual") + with can.interface.Bus(interface="virtual") as bus_recv: + msg_recv = bus_recv.recv(timeout=0) - msg_recv = bus_recv.recv(timeout=0) - - # Receiving nothing should evaluate msg_recv to False - self.assertFalse(msg_recv) + # Receiving nothing should evaluate msg_recv to False + self.assertFalse(msg_recv) def test_recv_zero_dlc(self): - bus_send = can.interface.Bus(interface="virtual") - bus_recv = can.interface.Bus(interface="virtual") - msg_send = can.Message(is_extended_id=False, arbitration_id=0x100, data=[]) + with ( + can.interface.Bus(interface="virtual") as bus_send, + can.interface.Bus(interface="virtual") as bus_recv, + ): + msg_send = can.Message(is_extended_id=False, arbitration_id=0x100, data=[]) - bus_send.send(msg_send) - msg_recv = bus_recv.recv() + bus_send.send(msg_send) + msg_recv = bus_recv.recv() - # Receiving a frame without data (dlc == 0) should evaluate msg_recv to True - self.assertTrue(msg_recv) + # Receiving a frame without data (dlc == 0) should evaluate msg_recv to True + self.assertTrue(msg_recv) if __name__ == "__main__":