Skip to content

Commit 8bf323c

Browse files
Add integration tests for CallbackGroup entity cleanup
Tests verify that: - Expired weak_ptrs are compacted during collect_all_ptrs - collect_all_ptrs only yields live entities - Adding N timers is not quadratic (5000 timers in < 5s) - Interleaved add/remove cycles produce correct entity counts - Mixed entity types (timers + subscriptions) are cleaned independently Signed-off-by: Pavel Guzenfeld <pavelguzenfeld@gmail.com>
1 parent 668e6db commit 8bf323c

2 files changed

Lines changed: 273 additions & 0 deletions

File tree

rclcpp/test/rclcpp/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ if(TARGET test_exceptions)
3636
target_link_libraries(test_exceptions ${PROJECT_NAME} mimick)
3737
endif()
3838

39+
ament_add_ros_isolated_gtest(test_callback_group_entity_cleanup test_callback_group_entity_cleanup.cpp)
40+
if(TARGET test_callback_group_entity_cleanup)
41+
target_link_libraries(test_callback_group_entity_cleanup ${PROJECT_NAME} ${test_msgs_TARGETS})
42+
endif()
3943
ament_add_ros_isolated_gtest(test_allocator_memory_strategy strategies/test_allocator_memory_strategy.cpp)
4044
if(TARGET test_allocator_memory_strategy)
4145
target_link_libraries(test_allocator_memory_strategy ${PROJECT_NAME} rcpputils::rcpputils ${test_msgs_TARGETS})
@@ -438,6 +442,11 @@ ament_add_ros_isolated_gtest(test_time_source test_time_source.cpp
438442
if(TARGET test_time_source)
439443
target_link_libraries(test_time_source ${PROJECT_NAME} rcl::rcl)
440444
endif()
445+
ament_add_ros_isolated_gtest(test_time_source_deadlock test_time_source_deadlock.cpp
446+
TIMEOUT 60)
447+
if(TARGET test_time_source_deadlock)
448+
target_link_libraries(test_time_source_deadlock ${PROJECT_NAME})
449+
endif()
441450

442451
ament_add_ros_isolated_gtest(test_utilities test_utilities.cpp
443452
APPEND_LIBRARY_DIRS "${append_library_dirs}")
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
// Copyright 2026 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* Integration tests for CallbackGroup entity cleanup.
17+
*
18+
* Verifies that expired weak_ptr entries are properly cleaned up when
19+
* cleanup is deferred to collect_all_ptrs (instead of running on every
20+
* add_* call).
21+
*/
22+
23+
#include <chrono>
24+
#include <memory>
25+
#include <vector>
26+
27+
#include "gtest/gtest.h"
28+
29+
#include "rclcpp/rclcpp.hpp"
30+
#include "test_msgs/msg/empty.hpp"
31+
#include "test_msgs/srv/empty.hpp"
32+
33+
using namespace std::chrono_literals;
34+
35+
class TestCallbackGroupEntityCleanup : public ::testing::Test
36+
{
37+
public:
38+
static void SetUpTestCase()
39+
{
40+
rclcpp::init(0, nullptr);
41+
}
42+
43+
static void TearDownTestCase()
44+
{
45+
rclcpp::shutdown();
46+
}
47+
48+
void SetUp() override
49+
{
50+
node_ = std::make_shared<rclcpp::Node>("test_node", "/test_ns");
51+
}
52+
53+
void TearDown() override
54+
{
55+
node_.reset();
56+
}
57+
58+
/// Trigger collect_all_ptrs to run cleanup on the given callback group.
59+
void trigger_collection(rclcpp::CallbackGroup::SharedPtr cbg)
60+
{
61+
cbg->collect_all_ptrs(
62+
[](const rclcpp::SubscriptionBase::SharedPtr &) {},
63+
[](const rclcpp::ServiceBase::SharedPtr &) {},
64+
[](const rclcpp::ClientBase::SharedPtr &) {},
65+
[](const rclcpp::TimerBase::SharedPtr &) {},
66+
[](const rclcpp::Waitable::SharedPtr &) {});
67+
}
68+
69+
rclcpp::Node::SharedPtr node_;
70+
};
71+
72+
/// After adding and then destroying timers, collect_all_ptrs should
73+
/// compact the expired entries, reducing size().
74+
TEST_F(TestCallbackGroupEntityCleanup, expired_timers_cleaned_on_collect)
75+
{
76+
auto cbg = node_->create_callback_group(
77+
rclcpp::CallbackGroupType::MutuallyExclusive);
78+
79+
// Add 100 timers
80+
std::vector<rclcpp::TimerBase::SharedPtr> timers;
81+
for (int i = 0; i < 100; ++i) {
82+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
83+
}
84+
EXPECT_EQ(cbg->size(), 100u);
85+
86+
// Destroy half the timers (the weak_ptrs in the callback group expire)
87+
timers.erase(timers.begin(), timers.begin() + 50);
88+
89+
// Before collection, size() still reports 100 (expired entries remain)
90+
EXPECT_EQ(cbg->size(), 100u);
91+
92+
// Trigger collect_all_ptrs — should compact expired entries
93+
trigger_collection(cbg);
94+
95+
// After collection, only the 50 live timers should remain
96+
EXPECT_EQ(cbg->size(), 50u);
97+
}
98+
99+
/// After adding and destroying subscriptions, collect_all_ptrs should
100+
/// compact the expired entries. Each subscription may register
101+
/// additional entities (e.g. event waitables), so we use relative sizes.
102+
TEST_F(TestCallbackGroupEntityCleanup, expired_subscriptions_cleaned_on_collect)
103+
{
104+
auto cbg = node_->create_callback_group(
105+
rclcpp::CallbackGroupType::MutuallyExclusive);
106+
107+
std::vector<rclcpp::SubscriptionBase::SharedPtr> subs;
108+
for (int i = 0; i < 20; ++i) {
109+
rclcpp::SubscriptionOptions opts;
110+
opts.callback_group = cbg;
111+
subs.push_back(
112+
node_->create_subscription<test_msgs::msg::Empty>(
113+
"topic_" + std::to_string(i), 10,
114+
[](test_msgs::msg::Empty::ConstSharedPtr) {}, opts));
115+
}
116+
const size_t size_with_all = cbg->size();
117+
EXPECT_GT(size_with_all, 0u);
118+
119+
// Destroy all subscriptions — expired entries remain until collection
120+
subs.clear();
121+
EXPECT_EQ(cbg->size(), size_with_all);
122+
123+
trigger_collection(cbg);
124+
125+
EXPECT_EQ(cbg->size(), 0u); // all cleaned up
126+
}
127+
128+
/// collect_all_ptrs should only yield live entities and skip expired ones.
129+
TEST_F(TestCallbackGroupEntityCleanup, collect_only_yields_live_entities)
130+
{
131+
auto cbg = node_->create_callback_group(
132+
rclcpp::CallbackGroupType::MutuallyExclusive);
133+
134+
// Add 10 timers
135+
std::vector<rclcpp::TimerBase::SharedPtr> timers;
136+
for (int i = 0; i < 10; ++i) {
137+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
138+
}
139+
140+
// Destroy odd-indexed timers (0-based: 1, 3, 5, 7, 9)
141+
for (int i = 9; i >= 1; i -= 2) {
142+
timers.erase(timers.begin() + i);
143+
}
144+
ASSERT_EQ(timers.size(), 5u);
145+
146+
// Collect and count live timers
147+
size_t collected_count = 0;
148+
cbg->collect_all_ptrs(
149+
[](const rclcpp::SubscriptionBase::SharedPtr &) {},
150+
[](const rclcpp::ServiceBase::SharedPtr &) {},
151+
[](const rclcpp::ClientBase::SharedPtr &) {},
152+
[&collected_count](const rclcpp::TimerBase::SharedPtr &) {
153+
++collected_count;
154+
},
155+
[](const rclcpp::Waitable::SharedPtr &) {});
156+
157+
EXPECT_EQ(collected_count, 5u);
158+
EXPECT_EQ(cbg->size(), 5u); // compacted
159+
}
160+
161+
/// Adding many entities should not exhibit quadratic slowdown.
162+
/// This test adds N timers and asserts the time is well under the
163+
/// quadratic threshold.
164+
TEST_F(TestCallbackGroupEntityCleanup, add_many_timers_is_not_quadratic)
165+
{
166+
auto cbg = node_->create_callback_group(
167+
rclcpp::CallbackGroupType::MutuallyExclusive);
168+
169+
const size_t N = 5000;
170+
std::vector<rclcpp::TimerBase::SharedPtr> timers;
171+
timers.reserve(N);
172+
173+
auto start = std::chrono::steady_clock::now();
174+
for (size_t i = 0; i < N; ++i) {
175+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
176+
}
177+
auto elapsed = std::chrono::steady_clock::now() - start;
178+
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count();
179+
180+
EXPECT_EQ(cbg->size(), N);
181+
182+
// With O(N²) the old code took ~429ms for 10K timers.
183+
// With O(N) we expect < 100ms for 5K timers even on slow CI.
184+
// Use a generous threshold to avoid flakiness.
185+
EXPECT_LT(ms, 5000) << "Adding " << N << " timers took " << ms
186+
<< "ms — possible quadratic regression";
187+
}
188+
189+
/// After interleaved add/remove cycles, collect_all_ptrs should
190+
/// report exactly the live entities.
191+
TEST_F(TestCallbackGroupEntityCleanup, interleaved_add_remove_cycles)
192+
{
193+
auto cbg = node_->create_callback_group(
194+
rclcpp::CallbackGroupType::MutuallyExclusive);
195+
196+
std::vector<rclcpp::TimerBase::SharedPtr> timers;
197+
198+
// Cycle 1: add 20, remove 10
199+
for (int i = 0; i < 20; ++i) {
200+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
201+
}
202+
timers.erase(timers.begin(), timers.begin() + 10);
203+
204+
// Cycle 2: add 30 more
205+
for (int i = 0; i < 30; ++i) {
206+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
207+
}
208+
209+
// Cycle 3: remove 15
210+
timers.erase(timers.begin(), timers.begin() + 15);
211+
212+
// Trigger cleanup
213+
size_t live = 0;
214+
cbg->collect_all_ptrs(
215+
[](const rclcpp::SubscriptionBase::SharedPtr &) {},
216+
[](const rclcpp::ServiceBase::SharedPtr &) {},
217+
[](const rclcpp::ClientBase::SharedPtr &) {},
218+
[&live](const rclcpp::TimerBase::SharedPtr &) { ++live; },
219+
[](const rclcpp::Waitable::SharedPtr &) {});
220+
221+
EXPECT_EQ(live, timers.size());
222+
EXPECT_EQ(cbg->size(), timers.size());
223+
}
224+
225+
/// Mixed entity types: timers + subscriptions, verify independent cleanup.
226+
/// Subscriptions may register extra entities (event waitables), so we
227+
/// track relative sizes.
228+
TEST_F(TestCallbackGroupEntityCleanup, mixed_entity_types_cleanup)
229+
{
230+
auto cbg = node_->create_callback_group(
231+
rclcpp::CallbackGroupType::MutuallyExclusive);
232+
233+
// Add timers and subscriptions
234+
std::vector<rclcpp::TimerBase::SharedPtr> timers;
235+
std::vector<rclcpp::SubscriptionBase::SharedPtr> subs;
236+
const int N = 10;
237+
for (int i = 0; i < N; ++i) {
238+
timers.push_back(node_->create_wall_timer(1h, []() {}, cbg));
239+
}
240+
const size_t size_timers_only = cbg->size();
241+
EXPECT_EQ(size_timers_only, static_cast<size_t>(N));
242+
243+
for (int i = 0; i < N; ++i) {
244+
rclcpp::SubscriptionOptions opts;
245+
opts.callback_group = cbg;
246+
subs.push_back(
247+
node_->create_subscription<test_msgs::msg::Empty>(
248+
"mix_topic_" + std::to_string(i), 10,
249+
[](test_msgs::msg::Empty::ConstSharedPtr) {}, opts));
250+
}
251+
const size_t size_both = cbg->size();
252+
const size_t sub_entities = size_both - size_timers_only;
253+
EXPECT_GT(sub_entities, 0u);
254+
255+
// Destroy all timers, keep subscriptions
256+
timers.clear();
257+
trigger_collection(cbg);
258+
EXPECT_EQ(cbg->size(), sub_entities);
259+
260+
// Destroy all subscriptions
261+
subs.clear();
262+
trigger_collection(cbg);
263+
EXPECT_EQ(cbg->size(), 0u);
264+
}

0 commit comments

Comments
 (0)