Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,25 +409,44 @@ For more information, please refer to https://github.com/ros2/rosbag2/blob/rolli

#### Controlling playback via services

The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes,
The Rosbag2 player provides the following services for remote control, which can be called via
`ros2 service` commandline or from your nodes.

Time-based `~/play` and `~/resume` requests can be used to coordinate playback across multiple
`rosbag2_transport::Player` instances by scheduling playback state changes for the same time in
the future. Scheduled playback actions are evaluated against the player node clock, so they can be
coordinated with either system time or `/clock` when the player is configured to use simulation
time.

* `~/burst [rosbag2_interfaces/srv/Burst]`
* Can only be used while player is paused, publishes `num_messages` in order as fast as possible, moving forward the play head.
* Can only be used while player is paused, publishes `num_messages` in order as fast as possible,
moving forward the play head.
* `~/get_rate [rosbag2_interfaces/srv/GetRate]`
* Return the current playback rate.
* `~/is_paused [rosbag2_interfaces/srv/IsPaused]`
* Return whether playback is paused.
* `~/pause [rosbag2_interfaces/srv/Pause]`
* Pause playback. Has no effect if already paused.
* `~/play [rosbag2_interfaces/srv/Play]`
* Play from a starting offset timestamp, either until the end, an ending timestamp or for a set duration. Only works when stopped (not paused).
* Play from a starting offset timestamp, either until the end, an ending timestamp or for a set
duration. Only works when stopped (not paused).
* `start_time` schedules playback to begin at a specific future node time.
* If `start_time` is zero, unset, or in the past, playback starts immediately.
* A successful scheduled `play` request returns immediately and playback begins when the player
node clock reaches `start_time`.
* Scheduling a future `play` while another playback is active is allowed; the request succeeds
and the next playback begins once the player is stopped and the scheduled time is reached.
* Response fields:
- `return_code`: `success`, `already_running`, or `failed_to_start`.
- `error_string`: empty on success, otherwise describes the failure.
* `~/play_next [rosbag2_interfaces/srv/PlayNext]`
* Play a single next message from the bag. Only works while paused.
* `~/resume [rosbag2_interfaces/srv/Resume]`
* Resume playback if paused.
* Resume playback if paused, either immediately or at a scheduled future node time.
* The newer `publish_time` and `receive_time` resume modes are recorder-only.
* Player only supports `resume_mode = node_time`.
* `resume_time` is not supported by player and must be zero for player resume requests.
* `resume_time` may be used with `resume_mode = node_time` to schedule a future playback resume.
* If `resume_time` is zero, unset, or in the past, playback resumes immediately.
* `tracking_topic_name` is not supported by player and must be empty.
* If `resume_mode` is `publish_time` or `receive_time`, player rejects the request with
`return_code = invalid_resume_mode` and a descriptive `error_string`.
Expand All @@ -439,11 +458,18 @@ The Rosbag2 player provides the following services for remote control, which can
- `return_code`: `success`, `invalid_resume_mode`, or `invalid_tracking_topic`.
- `error_string`: empty on success, otherwise describes the failure.
* `~/seek [rosbag2_interfaces/srv/Seek]`
* Change the play head to the specified timestamp. Can be forward or backward in time, the next played message is the next immediately after the seeked timestamp.
* Change the play head to the specified timestamp. Can be forward or backward in time, the next
played message is the next immediately after the seeked timestamp.
* Returns `success = true` when the requested time is within the bag duration and the seek
operation succeeds.
* `~/set_rate [rosbag2_interfaces/srv/SetRate]`
* Sets the rate of playback, for example 2.0 will play messages twice as fast.
* `~/stop [rosbag2_interfaces/srv/Stop]`
* Stop the player, putting the play head in "undefined position" outside the bag. Must call `play` before other operations can be done.
* Stop the player, putting the play head in "undefined position" outside the bag. Must call `play`
before other operations can be done.
* Returns `return_code = 0` on success.
* Returns `return_code = 1` with an `error_string` if playback is already stopped or if stopping
fails.
* `~/toggle_paused [rosbag2_interfaces/srv/TogglePaused]`
* Pause if playing, resume if paused.

Expand Down
19 changes: 17 additions & 2 deletions rosbag2_interfaces/srv/Play.srv
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
# See rosbag2_transport::PlayOptions::start_offset
builtin_interfaces/Time start_offset

# See rosbag2_transport::PlayOptions::playback_duration
builtin_interfaces/Duration playback_duration

# See rosbag2_transport::PlayOptions::playback_until_timestamp
builtin_interfaces/Time playback_until_timestamp

# Timestamp in the future when to start playing.
# If empty or time in the past, playing starts immediately.
builtin_interfaces/Time start_time

---
# returns false when another playback execution is running, otherwise true
bool success
# Return codes for player response.
int32 RETURN_CODE_SUCCESS=0
int32 RETURN_CODE_ALREADY_RUNNING=1
int32 RETURN_CODE_FAILED_TO_START=2

# Return code. Use RETURN_CODE_SUCCESS on success; otherwise use one of the error codes.
int32 return_code

# Error string. Empty if no error occurred.
string error_string
2 changes: 0 additions & 2 deletions rosbag2_interfaces/srv/Resume.srv
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ int32 RESUME_MODE_RECEIVE_TIME=2

# Timestamp in the future when to resume recording/playback.
# If empty or time in the past, resumes recording/playback immediately.
# Note: The resume_time is not supported by player and shall be set to zero when using Resume with
# player.
builtin_interfaces/Time resume_time

# Resume mode to use for the resume_time request.
Expand Down
109 changes: 105 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rosbag2_transport/player_service_client.hpp"
#include "rosbag2_transport/player_progress_bar.hpp"
#include "rosbag2_transport/reader_writer_factory.hpp"
#include "rosbag2_transport/delayed_action_task_runner.hpp"
#include "rosbag2_transport/readers_manager.hpp"

#include "logging.hpp"
Expand Down Expand Up @@ -311,6 +312,25 @@ class PlayerImpl
std::unordered_map<std::string, PlayerActionClientSharedPtr> action_clients_;

private:
using PlayResponse = rosbag2_interfaces::srv::Play::Response;
using PlayRequest = rosbag2_interfaces::srv::Play::Request;

/// \brief Return codes for play operation.
enum class PlayReturnCode : int32_t
{
Success = PlayResponse::RETURN_CODE_SUCCESS,
AlreadyRunning = PlayResponse::RETURN_CODE_ALREADY_RUNNING,
FailedToStart = PlayResponse::RETURN_CODE_FAILED_TO_START
};

/// \brief Convert enum class to its underlying type.
template<typename E>
static constexpr std::underlying_type_t<E> to_underlying_type(E e) noexcept
{
static_assert(std::is_enum<E>::value, "E must be an enum type");
return static_cast<std::underlying_type_t<E>>(e);
}

rosbag2_storage::SerializedBagMessageSharedPtr take_next_message_from_queue();
void load_storage_content();

Expand Down Expand Up @@ -344,6 +364,19 @@ class PlayerImpl
rcutils_time_point_value_t get_message_order_timestamp(
const rosbag2_storage::SerializedBagMessageSharedPtr & message) const;

/// \brief Convert a builtin_interfaces::msg::Time to an optional rclcpp::Time.
/// \param time_msg The time message to convert.
/// \return An optional rclcpp::Time. If time_msg is zero, returns std::nullopt.
std::optional<rclcpp::Time> optional_time_from_request(
const builtin_interfaces::msg::Time & time_msg) const;

static void set_service_play_resp_success(
rosbag2_interfaces::srv::Play::Response & response) noexcept;
static void set_service_play_resp_error(
rosbag2_interfaces::srv::Play::Response & response,
PlayReturnCode code,
const std::string & error_string) noexcept;

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
std::atomic_bool cancel_wait_for_next_message_{false};
Expand Down Expand Up @@ -375,9 +408,11 @@ class PlayerImpl
}

Player * owner_;
DelayedActionTaskRunner action_task_runner_;
rosbag2_transport::PlayOptions play_options_;
static constexpr const char * kDefaultReadSplitTopicName = "events/read_split";
rcutils_time_point_value_t play_until_timestamp_ = -1;

LockedPriorityQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
using BagMessageComparator =
LockedPriorityQueue<rosbag2_storage::SerializedBagMessageSharedPtr>::Comparator;
Expand Down Expand Up @@ -477,6 +512,7 @@ PlayerImpl::PlayerImpl(
const rosbag2_transport::PlayOptions & play_options)
: readers_(std::make_unique<ReadersManager>(std::move(readers_with_options))),
owner_(owner),
action_task_runner_(owner_),
play_options_(play_options),
message_queue_(get_bag_message_comparator(play_options_.message_order)),
keyboard_handler_(std::move(keyboard_handler)),
Expand Down Expand Up @@ -560,6 +596,9 @@ PlayerImpl::PlayerImpl(
prepare_publishers();
configure_play_until_timestamp();

// Starting delayed action task runner
action_task_runner_.start();

create_control_services();
add_keyboard_callbacks();
progress_bar_->print_help_str();
Expand All @@ -576,6 +615,8 @@ PlayerImpl::~PlayerImpl()
}
}

// Stopping delayed action task runner
action_task_runner_.stop();
// Force to stop playback to avoid hangout in case of unexpected exception or when smart
// pointer to the player object goes out of scope
stop();
Expand Down Expand Up @@ -2132,7 +2173,18 @@ void PlayerImpl::create_control_services()
response->error_string = "tracking_topic_name is not supported by player Resume service.";
return;
}
owner_->resume();
// Convert builtin_interfaces::msg::Time to optional rclcpp::Time
auto resume_time = optional_time_from_request(request->resume_time);

// Execute immediately if no time specified or time is in the past/now
if (!resume_time.has_value() || *resume_time <= owner_->now()) {
owner_->resume();
} else {
auto action_task = [this]() {
owner_->resume();
};
action_task_runner_.schedule(*resume_time, std::move(action_task), "Resume playback");
}
});
srv_toggle_paused_ = owner_->create_service<rosbag2_interfaces::srv::TogglePaused>(
"~/toggle_paused",
Expand Down Expand Up @@ -2174,10 +2226,35 @@ void PlayerImpl::create_control_services()
{
play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds();
play_options_.playback_duration = rclcpp::Duration(request->playback_duration);
play_options_.playback_until_timestamp =
rclcpp::Time(request->playback_until_timestamp).nanoseconds();
rcl_time_point_value_t nanosec = RCL_S_TO_NS(static_cast<int64_t>
(request->playback_until_timestamp.sec)) + request->playback_until_timestamp.nanosec;
play_options_.playback_until_timestamp = nanosec;
configure_play_until_timestamp();
response->success = owner_->play();

// Check if start_time is specified for scheduling
auto start_time = optional_time_from_request(request->start_time);

if (!start_time.has_value() || *start_time <= owner_->now()) {
try {
const bool play_result = owner_->play();
if (play_result) {
set_service_play_resp_success(*response);
} else {
set_service_play_resp_error(
*response, PlayReturnCode::AlreadyRunning, "Player is already running.");
}
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to start playback immediately. \nError: " << e.what());
set_service_play_resp_error(*response, PlayReturnCode::FailedToStart, e.what());
}
} else {
auto action_task = [this]() {
(void)owner_->play();
};
action_task_runner_.schedule(*start_time, std::move(action_task), "Start playback");
set_service_play_resp_success(*response);
}
});
srv_play_next_ = owner_->create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
Expand Down Expand Up @@ -2242,6 +2319,30 @@ void PlayerImpl::configure_play_until_timestamp()
}
}

std::optional<rclcpp::Time> PlayerImpl::optional_time_from_request(
const builtin_interfaces::msg::Time & time_msg) const
{
if (time_msg.sec == 0 && time_msg.nanosec == 0) {
return std::nullopt;
}
return rclcpp::Time(time_msg, owner_->get_clock()->get_clock_type());
}

void PlayerImpl::set_service_play_resp_success(PlayResponse & response) noexcept
{
response.return_code = to_underlying_type(PlayReturnCode::Success);
response.error_string.clear();
}

void PlayerImpl::set_service_play_resp_error(
PlayResponse & response,
PlayReturnCode code,
const std::string & error_string) noexcept
{
response.return_code = to_underlying_type(code);
response.error_string = error_string;
}

inline bool PlayerImpl::shall_stop_at_timestamp(
const rcutils_time_point_value_t & msg_timestamp) const
{
Expand Down
10 changes: 6 additions & 4 deletions rosbag2_transport/test/rosbag2_transport/mock_player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ class MockPlayer : public rosbag2_transport::Player
std::unique_ptr<rosbag2_cpp::Reader> reader,
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_mock_player")
: Player(std::move(reader), storage_options, play_options, node_name)
const std::string & node_name = "rosbag2_mock_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
: Player(std::move(reader), storage_options, play_options, node_name, node_options)
{}

MockPlayer(
std::vector<rosbag2_transport::Player::reader_storage_options_pair_t> && input_bags,
const rosbag2_transport::PlayOptions & play_options,
const std::string & node_name = "rosbag2_mock_player")
: Player(std::move(input_bags), play_options, node_name)
const std::string & node_name = "rosbag2_mock_player",
const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions())
: Player(std::move(input_bags), play_options, node_name, node_options)
{}

explicit MockPlayer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ComposablePlayerIntegrationTests : public CompositionManagerTestFixture
auto result = std::make_shared<srvPlay::Response>();
EXPECT_NO_THROW({result = play_future.get();});
EXPECT_TRUE(result);
return result->success;
return result->return_code == rosbag2_interfaces::srv::Play::Response::RETURN_CODE_SUCCESS;
}

protected:
Expand Down
Loading
Loading