diff --git a/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp index 9f1a03d902..b5cc6f3c64 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp @@ -110,50 +110,62 @@ class PublicationManager std::chrono::duration timeout = std::chrono::seconds(10), size_t n_subscribers_to_match = 1) { - rclcpp::PublisherBase::SharedPtr publisher = nullptr; - for (const auto pub : publishers_) { - if (!std::strcmp( - std::get(pub)->get_topic_name(), - topic_name)) - { - publisher = std::get(pub); - break; - } - } - + auto publisher = get_publisher_for_topic(topic_name); if (publisher == nullptr) { return false; } - if (publisher->get_subscription_count() + - publisher->get_intra_process_subscription_count() >= n_subscribers_to_match) - { + if (publisher_has_matching_subscriptions(publisher, n_subscribers_to_match)) { return true; } - auto sleep_time = std::chrono::milliseconds(10); - - if (timeout < std::chrono::seconds(1)) { - sleep_time = timeout; - } + auto graph_event = pub_node_->get_graph_event(); + (void)graph_event->check_and_clear(); using clock = std::chrono::steady_clock; - auto start = clock::now(); + const auto deadline = clock::now() + std::chrono::duration_cast(timeout); + constexpr auto max_wait_slice = std::chrono::milliseconds(100); - do { - std::this_thread::sleep_for(sleep_time); - - if (publisher->get_subscription_count() + - publisher->get_intra_process_subscription_count() >= n_subscribers_to_match) - { + while (clock::now() < deadline) { + if (publisher_has_matching_subscriptions(publisher, n_subscribers_to_match)) { return true; } - } while ((clock::now() - start) < timeout); - return false; + const auto now_time = clock::now(); + if (now_time < deadline) { + auto remaining = + std::chrono::duration_cast(deadline - now_time); + if (remaining > max_wait_slice) { + remaining = max_wait_slice; + } + pub_node_->wait_for_graph_change(graph_event, remaining); + } + (void)graph_event->check_and_clear(); + } + + return publisher_has_matching_subscriptions(publisher, n_subscribers_to_match); } private: + rclcpp::PublisherBase::SharedPtr get_publisher_for_topic(const char * topic_name) const + { + for (const auto & pub : publishers_) { + auto publisher = std::get(pub); + if (!std::strcmp(publisher->get_topic_name(), topic_name)) { + return publisher; + } + } + return nullptr; + } + + static bool publisher_has_matching_subscriptions( + const rclcpp::PublisherBase::SharedPtr & publisher, + size_t n_subscribers_to_match) + { + return publisher->get_subscription_count() + + publisher->get_intra_process_subscription_count() >= n_subscribers_to_match; + } + std::shared_ptr pub_node_; using PublicationF = std::function; std::vector> publishers_;