Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,62 @@ class PublicationManager
std::chrono::duration<DurationRepT, DurationT> timeout = std::chrono::seconds(10),
size_t n_subscribers_to_match = 1)
{
rclcpp::PublisherBase::SharedPtr publisher = nullptr;
for (const auto pub : publishers_) {
if (!std::strcmp(
std::get<rclcpp::PublisherBase::SharedPtr>(pub)->get_topic_name(),
topic_name))
{
publisher = std::get<rclcpp::PublisherBase::SharedPtr>(pub);
break;
}
}

auto publisher = get_publisher_for_topic(topic_name);
if (publisher == nullptr) {
return false;
}

if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match)
{
if (publisher_has_matching_subscriptions(publisher, n_subscribers_to_match)) {
return true;
}

auto sleep_time = std::chrono::milliseconds(10);

if (timeout < std::chrono::seconds(1)) {
sleep_time = timeout;
}
auto graph_event = pub_node_->get_graph_event();
(void)graph_event->check_and_clear();

using clock = std::chrono::steady_clock;
auto start = clock::now();
const auto deadline = clock::now() + std::chrono::duration_cast<clock::duration>(timeout);
constexpr auto max_wait_slice = std::chrono::milliseconds(100);

do {
std::this_thread::sleep_for(sleep_time);

if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match)
{
while (clock::now() < deadline) {
if (publisher_has_matching_subscriptions(publisher, n_subscribers_to_match)) {
return true;
}
} while ((clock::now() - start) < timeout);

return false;
const auto now_time = clock::now();
if (now_time < deadline) {
auto remaining =
std::chrono::duration_cast<std::chrono::nanoseconds>(deadline - now_time);
if (remaining > max_wait_slice) {
remaining = max_wait_slice;
}
pub_node_->wait_for_graph_change(graph_event, remaining);
}
(void)graph_event->check_and_clear();
}

return publisher_has_matching_subscriptions(publisher, n_subscribers_to_match);
}

private:
rclcpp::PublisherBase::SharedPtr get_publisher_for_topic(const char * topic_name) const
{
for (const auto & pub : publishers_) {
auto publisher = std::get<rclcpp::PublisherBase::SharedPtr>(pub);
if (!std::strcmp(publisher->get_topic_name(), topic_name)) {
return publisher;
}
}
return nullptr;
}

static bool publisher_has_matching_subscriptions(
const rclcpp::PublisherBase::SharedPtr & publisher,
size_t n_subscribers_to_match)
{
return publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match;
}

std::shared_ptr<rclcpp::Node> pub_node_;
using PublicationF = std::function<void (bool)>;
std::vector<std::pair<rclcpp::PublisherBase::SharedPtr, PublicationF>> publishers_;
Expand Down
Loading