Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/ros2_medkit_action_status_bridge/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ generically, by observing the goal-status topic.
## What it does

- Discovers every action on the graph by scanning for `*/_action/status`
topics (re-scanned on a timer to catch actions that appear later, and to drop
actions that vanish, e.g. a Nav2 lifecycle deactivate or a one-shot node).
topics (re-scanned on a timer to catch actions that appear later, to drop
actions that vanish, e.g. a Nav2 lifecycle deactivate or a one-shot node, and
to retry any fault whose delivery to the FaultManager was deferred).
- `ABORTED (6)` -> fault (`SEVERITY_ERROR` by default).
- `CANCELED (5)` -> fault only if `canceled_is_fault` (off by default; cancel is
usually intentional). When enabled it emits a `_CANCELED` code.
Expand All @@ -49,6 +50,16 @@ another goal is still failed, and a dropped terminal message cannot leave a
fault stuck. Per-goal dedup (keyed on `goal_id:status`) only suppresses
duplicate log lines; it never gates the transition.

The bridge tracks the latest observed state (the *desired* state) separately
from what the FaultManager was actually told. The transition is committed only
once the report is delivered: if the FaultManager service is not discovered yet
- e.g. an action's status topic is `transient_local` (latched) and its terminal
status reaches the bridge during the startup discovery window, before the report
client connects - the transition stays pending and is retried on the next
rescan, rather than the high-severity fault being silently dropped. (The latched
status is delivered to a subscription only once, so without this retry the
dropped report would never recover.)

## Scope: the terminal verdict, not the reason

This bridge delivers the generic "it aborted" event. The action-specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,35 @@ class ActionStatusBridgeNode : public rclcpp::Node {
/// the goals in the array does not affect the result (any failing goal wins).
static ActionState derive_state(const action_msgs::msg::GoalStatusArray & msg, bool canceled_is_fault);

/// Update per-action state from a message and act on the transition only.
/// Returns the state that was reported on (kFailed on raise, kHealthy on
/// heal) or kUnknown when nothing was reported. Side-effect free w.r.t.
/// reporting when `reporter` is null (test seam): the transition decision and
/// stored per-action state still update, so tests assert on the return value.
/// Update the desired per-action state from a message and reconcile it,
/// acting on the transition only. Returns the state that was reported on
/// (kFailed on raise, kHealthy on heal) or kUnknown when nothing was reported
/// - including when the report could not be delivered yet (the FaultManager
/// service is not discovered), in which case the transition stays pending and
/// is retried on the next rescan. A null `reporter` is the unit-test seam:
/// delivery is a no-op treated as successful, so the transition decision and
/// stored per-action state still update and tests assert on the return value.
ActionState apply_message(const std::string & action_name, const action_msgs::msg::GoalStatusArray & msg,
ros2_medkit_fault_reporter::FaultReporter * reporter);

private:
void rescan_actions();
void status_callback(const std::string & action_name, const action_msgs::msg::GoalStatusArray::ConstSharedPtr & msg);

/// Deliver the pending raise/heal for one action when the desired state
/// differs from what the FaultManager was last told AND the report is
/// deliverable (a null reporter is the test seam, treated as deliverable; a
/// real reporter must have a discovered service). Commits `last_reported_state_`
/// only after delivery, so a report that cannot go out yet stays pending and
/// is retried later instead of being silently dropped. Returns the state
/// reported on this call (kFailed/kHealthy) or kUnknown when nothing was sent.
ActionState reconcile(const std::string & action_name, ros2_medkit_fault_reporter::FaultReporter * reporter);

/// Re-attempt every action whose desired state has not been delivered to the
/// FaultManager yet. Driven from the rescan timer so a report dropped during
/// the startup discovery window is retried once the service is discovered.
void reconcile_pending();

/// Get (creating on first use) the FaultReporter for an action. The reporter's
/// source_id is fixed when first created: the resolved server FQN if discovery
/// has settled, otherwise the action name as a fallback so the fault still fires
Expand Down Expand Up @@ -142,7 +159,20 @@ class ActionStatusBridgeNode : public rclcpp::Node {
std::map<std::string, std::unique_ptr<ros2_medkit_fault_reporter::FaultReporter>> reporters_;
std::mutex reporters_mutex_;

// Last reported action-level state, keyed by action name. Drives transitions.
// Desired (latest observed) action-level state derived from status messages:
// the source of truth the bridge tries to make the FaultManager reflect.
// `canceled` records whether a failure was a CANCELED (vs ABORTED), to pick
// the right fault code when the report is delivered later.
struct DesiredState {
ActionState net{ActionState::kUnknown};
bool canceled{false};
};

// Desired vs last-reported state, both keyed by action name and guarded by
// state_mutex_. The transition raises/heals only when they differ; a report
// that cannot be delivered yet leaves last_reported_state_ behind desired_,
// so reconcile_pending() retries it on the next rescan instead of dropping it.
std::map<std::string, DesiredState> desired_state_;
std::map<std::string, ActionState> last_reported_state_;
std::mutex state_mutex_;

Expand Down Expand Up @@ -181,6 +211,19 @@ class ActionStatusBridgeTestAccess {
bool is_watched(const std::string & action_name) const;
bool has_state(const std::string & action_name) const;

/// True if the FaultManager was actually told this action is failed (the
/// last-reported state was committed, i.e. the report was delivered).
bool reported_failed(const std::string & action_name) const;

/// True if a failed state is observed but not yet delivered: the desired state
/// is failed while the last-reported state is not. This is the "pending retry"
/// condition - the fault is remembered, not dropped.
bool pending_failed(const std::string & action_name) const;

/// The FaultReporter for an action (created on first call). Lets a test drive
/// the deferred-delivery path with a real reporter whose service is not ready.
ros2_medkit_fault_reporter::FaultReporter * reporter_for(const std::string & action_name);

/// Identity of the FaultReporter for an action (created on first call). Lets a
/// test assert the reporter is created once and never swapped out.
const void * reporter_identity(const std::string & action_name);
Expand Down
146 changes: 117 additions & 29 deletions src/ros2_medkit_action_status_bridge/src/action_status_bridge_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <chrono>
#include <cstdio>
#include <functional>
#include <vector>

#include "action_msgs/msg/goal_status.hpp"
#include "ros2_medkit_msgs/msg/fault.hpp"
Expand Down Expand Up @@ -188,6 +189,12 @@ void ActionStatusBridgeNode::rescan_actions() {
}

prune_vanished(present);

// Re-attempt any fault delivery that was deferred because the FaultManager
// service was not discovered when the (latched) status arrived during startup.
// The latched message is delivered only once, so without this a dropped report
// would never be retried.
reconcile_pending();
}

void ActionStatusBridgeNode::prune_vanished(const std::map<std::string, std::string> & present_topics) {
Expand Down Expand Up @@ -223,6 +230,7 @@ void ActionStatusBridgeNode::prune_vanished(const std::map<std::string, std::str
{
std::lock_guard<std::mutex> lock(state_mutex_);
last_reported_state_.erase(action_name);
desired_state_.erase(action_name);
Comment thread
bburda marked this conversation as resolved.
}
RCLCPP_INFO(get_logger(), "Action '%s' vanished; dropped (topic %s)", action_name.c_str(), topic.c_str());
it = subs_.erase(it);
Expand Down Expand Up @@ -256,49 +264,106 @@ ActionStatusBridgeNode::apply_message(const std::string & action_name, const act
return ActionState::kUnknown; // no terminal verdict in this message
}

ActionState prev;
// Record the latest observed action-level state as the desired state, then
// reconcile it against what the FaultManager was last told. A SUCCEEDED while
// healing is disabled leaves the desired state untouched, so a prior failure
// stays failed (the no-heal contract).
{
std::lock_guard<std::mutex> lock(state_mutex_);
auto it = last_reported_state_.find(action_name);
// Absent state is treated as healthy: a first SUCCEEDED must not heal a
// fault that was never raised.
prev = (it == last_reported_state_.end()) ? ActionState::kHealthy : it->second;
auto & desired = desired_state_[action_name];
if (net == ActionState::kFailed) {
desired.net = ActionState::kFailed;
desired.canceled = describe_failure_is_cancel(msg, canceled_is_fault_);
} else if (heal_on_succeeded_) { // net == kHealthy
desired.net = ActionState::kHealthy;
}
}

// Only the two real transitions act: raise on (not failed)->failed, heal on
// failed->(healthy). Everything else is a no-op.
if (net == ActionState::kFailed && prev != ActionState::kFailed) {
const bool canceled = describe_failure_is_cancel(msg, canceled_is_fault_);
return reconcile(action_name, reporter);
}

ActionStatusBridgeNode::ActionState
ActionStatusBridgeNode::reconcile(const std::string & action_name,
ros2_medkit_fault_reporter::FaultReporter * reporter) {
DesiredState desired;
ActionState reported;
{
std::lock_guard<std::mutex> lock(state_mutex_);
auto dit = desired_state_.find(action_name);
if (dit == desired_state_.end() || dit->second.net == ActionState::kUnknown) {
return ActionState::kUnknown; // nothing desired yet
}
desired = dit->second;
auto rit = last_reported_state_.find(action_name);
// Absent reported state is treated as healthy: a first SUCCEEDED must not
// heal a fault that was never raised.
reported = (rit == last_reported_state_.end()) ? ActionState::kHealthy : rit->second;
}

if (desired.net == reported) {
Comment thread
bburda marked this conversation as resolved.
return ActionState::kUnknown; // already in sync, nothing to report
}

// A real reporter can only deliver once the FaultManager service is
// discovered; until then keep the transition pending (retried by
// reconcile_pending() on the next rescan) instead of dropping it. A null
// reporter is the unit-test decision-only seam: delivery is a no-op that
// still commits the transition.
const bool deliverable = (reporter == nullptr) || reporter->is_service_ready();
Comment thread
bburda marked this conversation as resolved.
if (!deliverable) {
RCLCPP_DEBUG(get_logger(), "FaultManager service not ready; deferring %s for action %s",
desired.net == ActionState::kFailed ? "fault" : "heal", action_name.c_str());
return ActionState::kUnknown;
}

if (desired.net == ActionState::kFailed) {
if (reporter != nullptr) {
const std::string code = fault_code_for(action_name, canceled);
const std::string code = fault_code_for(action_name, desired.canceled);
reporter->report(code, aborted_severity_,
std::string("Action ") + action_name + (canceled ? " canceled" : " aborted"));
std::string("Action ") + action_name + (desired.canceled ? " canceled" : " aborted"));
RCLCPP_INFO(get_logger(), "Action %s -> fault %s", action_name.c_str(), code.c_str());
}
{
std::lock_guard<std::mutex> lock(state_mutex_);
last_reported_state_[action_name] = ActionState::kFailed;
}
std::lock_guard<std::mutex> lock(state_mutex_);
last_reported_state_[action_name] = ActionState::kFailed;
return ActionState::kFailed;
}

if (net == ActionState::kHealthy && prev == ActionState::kFailed && heal_on_succeeded_) {
if (reporter != nullptr) {
// Heal both possible codes; only the one previously raised exists.
reporter->report_passed(fault_code_for(action_name, false));
if (canceled_is_fault_) {
reporter->report_passed(fault_code_for(action_name, true));
}
RCLCPP_INFO(get_logger(), "Action %s healed", action_name.c_str());
}
{
std::lock_guard<std::mutex> lock(state_mutex_);
last_reported_state_[action_name] = ActionState::kHealthy;
// desired.net == kHealthy, and healing was enabled when it was set.
if (reporter != nullptr) {
// Heal both possible codes; only the one previously raised exists.
reporter->report_passed(fault_code_for(action_name, false));
if (canceled_is_fault_) {
reporter->report_passed(fault_code_for(action_name, true));
}
return ActionState::kHealthy;
RCLCPP_INFO(get_logger(), "Action %s healed", action_name.c_str());
}
std::lock_guard<std::mutex> lock(state_mutex_);
last_reported_state_[action_name] = ActionState::kHealthy;
return ActionState::kHealthy;
}

return ActionState::kUnknown;
void ActionStatusBridgeNode::reconcile_pending() {
// Collect actions whose desired state has not been delivered to the
// FaultManager, then reconcile each outside the lock (reconcile + report()
// must not hold state_mutex_). reporter_for() is only called for actions with
// a real pending transition, so this never creates reporters speculatively.
std::vector<std::string> pending;
{
std::lock_guard<std::mutex> lock(state_mutex_);
for (const auto & [action_name, desired] : desired_state_) {
if (desired.net == ActionState::kUnknown) {
continue;
}
auto rit = last_reported_state_.find(action_name);
const ActionState reported = (rit == last_reported_state_.end()) ? ActionState::kHealthy : rit->second;
if (desired.net != reported) {
pending.push_back(action_name);
}
}
}
for (const auto & action_name : pending) {
reconcile(action_name, reporter_for(action_name));
}
}

void ActionStatusBridgeNode::status_callback(const std::string & action_name,
Expand Down Expand Up @@ -505,6 +570,29 @@ bool ActionStatusBridgeTestAccess::has_state(const std::string & action_name) co
return node_->last_reported_state_.find(action_name) != node_->last_reported_state_.end();
}

bool ActionStatusBridgeTestAccess::reported_failed(const std::string & action_name) const {
std::lock_guard<std::mutex> lock(node_->state_mutex_);
auto it = node_->last_reported_state_.find(action_name);
return it != node_->last_reported_state_.end() && it->second == ActionStatusBridgeNode::ActionState::kFailed;
}

bool ActionStatusBridgeTestAccess::pending_failed(const std::string & action_name) const {
std::lock_guard<std::mutex> lock(node_->state_mutex_);
auto dit = node_->desired_state_.find(action_name);
if (dit == node_->desired_state_.end() || dit->second.net != ActionStatusBridgeNode::ActionState::kFailed) {
return false;
}
auto rit = node_->last_reported_state_.find(action_name);
const auto reported =
(rit == node_->last_reported_state_.end()) ? ActionStatusBridgeNode::ActionState::kHealthy : rit->second;
return reported != ActionStatusBridgeNode::ActionState::kFailed;
}

ros2_medkit_fault_reporter::FaultReporter *
ActionStatusBridgeTestAccess::reporter_for(const std::string & action_name) {
return node_->reporter_for(action_name);
}

const void * ActionStatusBridgeTestAccess::reporter_identity(const std::string & action_name) {
return node_->reporter_for(action_name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,27 @@ TEST_F(ActionStatusBridgeTest, ReporterFor_StickyCreatedOnceNeverSwapped) {
EXPECT_EQ(access.reporter_identity("/nav"), first);
}

// --- deferred delivery: a fault that cannot reach the FaultManager yet (its
// service is not discovered) must be kept pending and retried, never silently
// dropped. This is the startup discovery race a latched ABORTED status hits. ---

TEST_F(ActionStatusBridgeTest, Reconcile_FaultDeferredWhenServiceUnavailable) {
auto node = std::make_shared<ActionStatusBridgeNode>();
ActionStatusBridgeTestAccess access(node.get());
auto * reporter = access.reporter_for("/nav");
ASSERT_NE(reporter, nullptr);
// No FaultManager runs in this unit test, so the report service is not ready.
ASSERT_FALSE(reporter->is_service_ready());

// An ABORTED whose report cannot be delivered must NOT commit the reported
// state (otherwise a later SUCCEEDED would "heal" a fault the manager never
// received) and must stay pending so a rescan can retry it once the service
// appears.
EXPECT_EQ(node->apply_message("/nav", one_goal(1, GoalStatus::STATUS_ABORTED), reporter), State::kUnknown);
EXPECT_FALSE(access.reported_failed("/nav")); // not falsely recorded as delivered
EXPECT_TRUE(access.pending_failed("/nav")); // remembered for retry, not dropped
Comment thread
bburda marked this conversation as resolved.
}

// --- rescan add + prune ---

TEST_F(ActionStatusBridgeTest, RescanPrune_DropsVanishedAction) {
Expand Down
Loading