diff --git a/ros2topic/ros2topic/verb/bw.py b/ros2topic/ros2topic/verb/bw.py index fdb1487e2..e5c2d61bc 100644 --- a/ros2topic/ros2topic/verb/bw.py +++ b/ros2topic/ros2topic/verb/bw.py @@ -40,6 +40,7 @@ import rclpy from rclpy.executors import ExternalShutdownException +from rclpy.subscription_content_filter_options import ContentFilterOptions from ros2cli.helpers import interactive_select from ros2cli.node.direct import add_arguments as add_direct_node_arguments @@ -96,6 +97,12 @@ def add_arguments(self, parser, cli_name): '--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE, help='maximum window size, in # of messages, for calculating rate ' f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW') + parser.add_argument( + '--content-filter', dest='content_filter_expr', default=None, + help='DDS content filter expression applied at the middleware level') + parser.add_argument( + '--content-filter-params', dest='content_filter_params', nargs='*', default=[], + help='Parameters for the content filter expression') add_direct_node_arguments(parser) def main(self, *, args): @@ -127,6 +134,12 @@ def main(args): topics = args.topic_name + content_filter_options = None + if args.content_filter_expr: + content_filter_options = ContentFilterOptions( + filter_expression=args.content_filter_expr, + expression_parameters=args.content_filter_params) + with DirectNode(args) as node: # Get all available topics at this moment if args.all_topics: @@ -139,7 +152,8 @@ def main(args): return print(f'Subscribing to all {len(topics)} available topics...') return _rostopic_bw(node.node, topics, qos_args=args, window_size=args.window_size, - all_topics=args.all_topics) + all_topics=args.all_topics, + content_filter_options=content_filter_options) class ROSTopicBandwidth(object): @@ -293,7 +307,8 @@ def _get_ascii_table(header, cols): return table -def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_topics=False): +def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_topics=False, + content_filter_options=None): """Periodically print the received bandwidth of topics to console until shutdown.""" # pause bw until topic is published rt = ROSTopicBandwidth(node, window_size) @@ -314,7 +329,8 @@ def _rostopic_bw(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, all_to topic, functools.partial(rt.callback, topic=topic), qos_profile, - raw=True + raw=True, + content_filter_options=content_filter_options ) print('Subscribed to [%s]' % topic) diff --git a/ros2topic/ros2topic/verb/echo.py b/ros2topic/ros2topic/verb/echo.py index 009d94a12..8ac3635a4 100644 --- a/ros2topic/ros2topic/verb/echo.py +++ b/ros2topic/ros2topic/verb/echo.py @@ -21,6 +21,7 @@ from rclpy.event_handler import UnsupportedEventTypeError from rclpy.node import Node from rclpy.qos import QoSProfile +from rclpy.subscription_content_filter_options import ContentFilterOptions from rclpy.task import Future from ros2cli.helpers import interactive_select @@ -107,6 +108,15 @@ def add_arguments(self, parser, cli_name): '--filter', dest='filter_expr', help='Python expression to filter messages that ' 'are printed. Expression can use Python builtins ' 'as well as m (the message).') + parser.add_argument( + '--content-filter', dest='content_filter_expr', default=None, + help='DDS content filter expression applied at the middleware level ' + '(e.g. "data > 100"). Only messages matching the filter are ' + 'delivered to the subscriber. Use %%0, %%1, ... as parameter ' + 'placeholders with --content-filter-params.') + parser.add_argument( + '--content-filter-params', dest='content_filter_params', nargs='*', default=[], + help='Parameters for the content filter expression') parser.add_argument( '--once', action='store_true', help='Print the first message received and then exit.') parser.add_argument( @@ -192,13 +202,20 @@ def main(self, *, args): if args.timeout is not None: self.timer = node.create_timer(args.timeout, self._timed_out) + content_filter_options = None + if args.content_filter_expr: + content_filter_options = ContentFilterOptions( + filter_expression=args.content_filter_expr, + expression_parameters=args.content_filter_params) + self.subscribe_and_spin( node, args.topic_name, message_type, qos_profile, args.no_lost_messages, - args.raw) + args.raw, + content_filter_options=content_filter_options) def subscribe_and_spin( self, @@ -208,6 +225,7 @@ def subscribe_and_spin( qos_profile: QoSProfile, no_report_lost_messages: bool, raw: bool, + content_filter_options: Optional[ContentFilterOptions] = None, ) -> Optional[str]: """Initialize a node with a single subscription and spin.""" event_callbacks = None @@ -221,7 +239,8 @@ def subscribe_and_spin( self._subscriber_callback, qos_profile, event_callbacks=event_callbacks, - raw=raw) + raw=raw, + content_filter_options=content_filter_options) except UnsupportedEventTypeError: assert not no_report_lost_messages node.create_subscription( @@ -230,7 +249,8 @@ def subscribe_and_spin( self._subscriber_callback, qos_profile, event_callbacks=None, - raw=raw) + raw=raw, + content_filter_options=content_filter_options) if self.future is not None: rclpy.spin_until_future_complete(node, self.future) diff --git a/ros2topic/ros2topic/verb/hz.py b/ros2topic/ros2topic/verb/hz.py index b18ca2cf6..9afe7f9f9 100644 --- a/ros2topic/ros2topic/verb/hz.py +++ b/ros2topic/ros2topic/verb/hz.py @@ -41,6 +41,7 @@ from rclpy.clock import Clock from rclpy.clock import ClockType from rclpy.executors import ExternalShutdownException +from rclpy.subscription_content_filter_options import ContentFilterOptions from ros2cli.helpers import interactive_select from ros2cli.node.direct import add_arguments as add_direct_node_arguments @@ -95,6 +96,12 @@ def add_arguments(self, parser, cli_name): dest='use_wtime', default=False, action='store_true', help='calculates rate using wall time which can be helpful' ' when clock is not published during simulation') + parser.add_argument( + '--content-filter', dest='content_filter_expr', default=None, + help='DDS content filter expression applied at the middleware level') + parser.add_argument( + '--content-filter-params', dest='content_filter_params', nargs='*', default=[], + help='Parameters for the content filter expression') add_direct_node_arguments(parser) def main(self, *, args): @@ -134,6 +141,12 @@ def eval_fn(m): else: filter_expr = None + content_filter_options = None + if args.content_filter_expr: + content_filter_options = ContentFilterOptions( + filter_expression=args.content_filter_expr, + expression_parameters=args.content_filter_params) + with DirectNode(args) as node: # Get all available topics at this moment if args.all_topics: @@ -149,7 +162,8 @@ def eval_fn(m): return _rostopic_hz( node.node, topics, qos_args=args, window_size=args.window_size, filter_expr=filter_expr, use_wtime=args.use_wtime, - all_topics=args.all_topics) + all_topics=args.all_topics, + content_filter_options=content_filter_options) class ROSTopicHz(object): @@ -337,7 +351,7 @@ def _get_ascii_table(header, cols): def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, - use_wtime=False, all_topics=False): + use_wtime=False, all_topics=False, content_filter_options=None): """ Periodically print the publishing rate of a topic to console until shutdown. @@ -346,6 +360,7 @@ def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter :param window_size: number of messages to average over, -1 for infinite, ``int`` :param filter_expr: Python filter expression that is called with m, the message instance :param all_topics: whether all topics are being monitored, ``bool`` + :param content_filter_options: DDS content filter options, ``ContentFilterOptions`` """ # pause hz until topic is published rt = ROSTopicHz(node, window_size, filter_expr=filter_expr, use_wtime=use_wtime) @@ -367,7 +382,8 @@ def _rostopic_hz(node, topics, qos_args, window_size=DEFAULT_WINDOW_SIZE, filter topic, functools.partial(rt.callback_hz, topic=topic), qos_profile, - raw=filter_expr is None) + raw=filter_expr is None, + content_filter_options=content_filter_options) if topics_len > 1: print('Subscribed to [%s]' % topic) diff --git a/ros2topic/test/test_bw_delay_hz.py b/ros2topic/test/test_bw_delay_hz.py index 2b1ede299..2106b4650 100644 --- a/ros2topic/test/test_bw_delay_hz.py +++ b/ros2topic/test/test_bw_delay_hz.py @@ -43,6 +43,8 @@ from rclpy.qos import ReliabilityPolicy from rclpy.utilities import get_rmw_implementation_identifier +from std_msgs.msg import String + # Skip cli tests on Windows while they exhibit pathological behavior # https://github.com/ros2/build_farmer/issues/248 @@ -504,3 +506,197 @@ def test_bw_both_all_and_topics_error(self, launch_service, proc_info, proc_outp assert 'Cannot specify both --all/-a and topic names' in command.output, ( 'bw command did not print expected error message' ) + + @launch_testing.markers.retry_on_failure(times=5) + def test_hz_content_filter(self, launch_service, proc_info, proc_output): + topic = '/clitest/topic/hz_content_filter' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + # Wait for the publisher to be discovered + publisher_count = 0 + timeout_count = 0 + while publisher_count == 0 and timeout_count < 10: + self.executor.spin_once(timeout_sec=0.1) + publisher_count = self.node.count_publishers(topic) + timeout_count += 1 + assert publisher_count > 0, 'Publisher was not discovered' + + try: + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'hz', + '--content-filter', "data = 'hello'", + topic], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + # The future won't complete - we will hit the timeout + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + assert command.output, 'hz with content filter printed no output' + assert re.search( + r'^average rate: [0-9\.]+$', command.output, flags=re.MULTILINE + ), 'hz with content filter did not print expected rate' + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + + @launch_testing.markers.retry_on_failure(times=5) + def test_bw_content_filter(self, launch_service, proc_info, proc_output): + topic = '/clitest/topic/bw_content_filter' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + # Wait for the publisher to be discovered + publisher_count = 0 + timeout_count = 0 + while publisher_count == 0 and timeout_count < 10: + self.executor.spin_once(timeout_sec=0.1) + publisher_count = self.node.count_publishers(topic) + timeout_count += 1 + assert publisher_count > 0, 'Publisher was not discovered' + + try: + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'bw', + '--content-filter', "data = 'hello'", + topic], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + # The future won't complete - we will hit the timeout + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + assert command.output, 'bw with content filter printed no output' + assert re.search( + r'^[0-9]+ B/s from [0-9]+ messages$', command.output, flags=re.MULTILINE + ), 'bw with content filter did not print expected bandwidth' + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + + @launch_testing.markers.retry_on_failure(times=5) + def test_hz_content_filter_no_match(self, launch_service, proc_info, proc_output): + topic = '/clitest/topic/hz_cfilter_nomatch' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + # Wait for the publisher to be discovered + publisher_count = 0 + timeout_count = 0 + while publisher_count == 0 and timeout_count < 10: + self.executor.spin_once(timeout_sec=0.1) + publisher_count = self.node.count_publishers(topic) + timeout_count += 1 + assert publisher_count > 0, 'Publisher was not discovered' + + try: + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'hz', + '--content-filter', "data = 'NOMATCH'", + topic], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + # No messages should match, so no rate should be reported + assert not re.search( + r'^average rate:', command.output, flags=re.MULTILINE + ), 'hz should not report rate when content filter rejects all messages' + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + + @launch_testing.markers.retry_on_failure(times=5) + def test_bw_content_filter_no_match(self, launch_service, proc_info, proc_output): + topic = '/clitest/topic/bw_cfilter_nomatch' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + # Wait for the publisher to be discovered + publisher_count = 0 + timeout_count = 0 + while publisher_count == 0 and timeout_count < 10: + self.executor.spin_once(timeout_sec=0.1) + publisher_count = self.node.count_publishers(topic) + timeout_count += 1 + assert publisher_count > 0, 'Publisher was not discovered' + + try: + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'bw', + '--content-filter', "data = 'NOMATCH'", + topic], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + # No messages should match, so no bandwidth should be reported + assert not re.search( + r'^[0-9]+ B/s', command.output, flags=re.MULTILINE + ), 'bw should not report bandwidth when content filter rejects all messages' + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) diff --git a/ros2topic/test/test_content_filter_contract.py b/ros2topic/test/test_content_filter_contract.py new file mode 100644 index 000000000..012a77ff1 --- /dev/null +++ b/ros2topic/test/test_content_filter_contract.py @@ -0,0 +1,127 @@ +# Copyright 2026 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Contract tests for --content-filter argument handling. + +These are lightweight unit tests that verify the interface contract between +CLI argument parsing and ContentFilterOptions construction, without requiring +a running ROS system. +""" + +import argparse +import unittest +from unittest.mock import MagicMock +from unittest.mock import patch + +from rclpy.subscription_content_filter_options import ContentFilterOptions + + +class TestContentFilterArgParsing(unittest.TestCase): + """Verify that each verb correctly parses --content-filter args.""" + + def _parse_echo_args(self, args_list): + from ros2topic.verb.echo import EchoVerb + verb = EchoVerb() + parser = argparse.ArgumentParser() + verb.add_arguments(parser, 'ros2 topic') + return parser.parse_args(args_list) + + def _parse_hz_args(self, args_list): + from ros2topic.verb.hz import HzVerb + verb = HzVerb() + parser = argparse.ArgumentParser() + verb.add_arguments(parser, 'ros2 topic') + return parser.parse_args(args_list) + + def _parse_bw_args(self, args_list): + from ros2topic.verb.bw import BwVerb + verb = BwVerb() + parser = argparse.ArgumentParser() + verb.add_arguments(parser, 'ros2 topic') + return parser.parse_args(args_list) + + def test_echo_content_filter_arg_present(self): + args = self._parse_echo_args([ + '/topic', '--content-filter', "data = 'hello'"]) + assert args.content_filter_expr == "data = 'hello'" + assert args.content_filter_params == [] + + def test_echo_content_filter_arg_absent(self): + args = self._parse_echo_args(['/topic']) + assert args.content_filter_expr is None + assert args.content_filter_params == [] + + def test_echo_content_filter_with_params(self): + args = self._parse_echo_args([ + '/topic', '--content-filter', 'data = %0', + '--content-filter-params', 'hello']) + assert args.content_filter_expr == 'data = %0' + assert args.content_filter_params == ['hello'] + + def test_echo_content_filter_with_multiple_params(self): + args = self._parse_echo_args([ + '/topic', '--content-filter', 'data BETWEEN %0 AND %1', + '--content-filter-params', '10', '20']) + assert args.content_filter_expr == 'data BETWEEN %0 AND %1' + assert args.content_filter_params == ['10', '20'] + + def test_hz_content_filter_arg_present(self): + args = self._parse_hz_args([ + '/topic', '--content-filter', "data = 'hello'"]) + assert args.content_filter_expr == "data = 'hello'" + assert args.content_filter_params == [] + + def test_hz_content_filter_arg_absent(self): + args = self._parse_hz_args(['/topic']) + assert args.content_filter_expr is None + assert args.content_filter_params == [] + + def test_bw_content_filter_arg_present(self): + args = self._parse_bw_args([ + '/topic', '--content-filter', "data = 'hello'"]) + assert args.content_filter_expr == "data = 'hello'" + assert args.content_filter_params == [] + + def test_bw_content_filter_arg_absent(self): + args = self._parse_bw_args(['/topic']) + assert args.content_filter_expr is None + assert args.content_filter_params == [] + + +class TestContentFilterOptionsConstruction(unittest.TestCase): + """Verify ContentFilterOptions is constructed correctly from parsed args.""" + + def test_options_with_expression_only(self): + opts = ContentFilterOptions( + filter_expression="data = 'hello'", + expression_parameters=[]) + assert opts.filter_expression == "data = 'hello'" + assert opts.expression_parameters == [] + + def test_options_with_expression_and_params(self): + opts = ContentFilterOptions( + filter_expression='data = %0', + expression_parameters=['hello']) + assert opts.filter_expression == 'data = %0' + assert opts.expression_parameters == ['hello'] + + def test_options_none_when_no_filter(self): + """Contract: when content_filter_expr is None, no options are created.""" + content_filter_expr = None + content_filter_options = None + if content_filter_expr: + content_filter_options = ContentFilterOptions( + filter_expression=content_filter_expr, + expression_parameters=[]) + assert content_filter_options is None diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index 12404c56b..5dc3aa9f8 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -502,6 +502,144 @@ def publish_message(): self.node.destroy_timer(publish_timer) self.node.destroy_publisher(publisher) + @launch_testing.markers.retry_on_failure(times=5) + def test_echo_content_filter(self, launch_service, proc_info, proc_output): + params = [ + ('/clitest/topic/echo_cfilter_match', "data = 'hello'", True), + ('/clitest/topic/echo_cfilter_nomatch', "data = 'NOMATCH'", False), + ] + for topic, filter_expr, has_output in params: + with self.subTest(topic=topic, filter_expr=filter_expr): + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + try: + command_action = ExecuteProcess( + cmd=(['ros2', 'topic', 'echo'] + + ['--content-filter', filter_expr] + + [topic, 'std_msgs/String']), + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + # The future won't complete - we will hit the timeout + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + # Check results + if has_output: + assert 'hello' in command.output, ( + 'Echo with content filter did not print expected message') + else: + assert 'hello' not in command.output, ( + 'Content filter should have filtered all messages') + + finally: + # Cleanup + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + + @launch_testing.markers.retry_on_failure(times=5) + def test_echo_content_filter_once(self, launch_service, proc_info, proc_output): + topic = '/clitest/topic/echo_cfilter_once' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(1.0, publish_message) + + try: + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'echo', + '--content-filter', "data = 'hello'", + '--once', + topic, 'std_msgs/msg/String'], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + # Spin so publisher timer fires + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=3 + ) + assert command.wait_for_shutdown(timeout=5) + assert launch_testing.tools.expect_output( + expected_lines=[ + 'data: hello', + '---', + ], + text=command.output, + strict=True + ) + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + + @launch_testing.markers.retry_on_failure(times=5) + def test_echo_content_filter_combined_with_filter( + self, launch_service, proc_info, proc_output + ): + topic = '/clitest/topic/echo_cfilter_combined' + publisher = self.node.create_publisher(String, topic, 10) + assert publisher + + def publish_message(): + publisher.publish(String(data='hello')) + + publish_timer = self.node.create_timer(0.5, publish_message) + + try: + # DDS content filter passes messages, Python filter rejects all. + # This proves both filters are applied: DDS allows, Python blocks. + command_action = ExecuteProcess( + cmd=['ros2', 'topic', 'echo', + '--content-filter', "data = 'hello'", + '--filter', "m.data == 'NOMATCH'", + topic, 'std_msgs/String'], + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + self.executor.spin_until_future_complete( + rclpy.task.Future(), timeout_sec=5 + ) + command.wait_for_shutdown(timeout=10) + # DDS filter passes 'hello' but Python filter rejects it + assert 'hello' not in command.output, ( + 'Python filter should have rejected messages that passed DDS filter') + finally: + self.node.destroy_timer(publish_timer) + self.node.destroy_publisher(publisher) + @launch_testing.markers.retry_on_failure(times=5) def test_echo_raw(self, launch_service, proc_info, proc_output): topic = '/clitest/topic/echo_raw'