Skip to content
Open
2 changes: 1 addition & 1 deletion src/rai_core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "rai_core"
version = "2.11.1"
version = "2.11.2"
description = "Core functionality for RAI framework"
readme = "README.md"
requires-python = ">=3.10,<3.13"
Expand Down
9 changes: 9 additions & 0 deletions src/rai_core/rai/communication/ros2/api/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,12 @@ def create_service(
handle = str(uuid.uuid4())
self._services[handle] = service
return handle

def shutdown(self) -> None:
for service in self._services.values():
service.destroy()
self._services.clear()
with self._persistent_clients_lock:
for client in self._persistent_clients.values():
client.destroy()
self._persistent_clients.clear()
21 changes: 12 additions & 9 deletions src/rai_core/rai/communication/ros2/api/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,23 @@ def __init__(
"""
self._node = node
self._logger = node.get_logger()
self._publishers: Dict[str, Publisher] = {}

# TODO: These fields are a workaround to prevent subscriber destruction,
# which often fails as described in https://github.com/ros2/rclpy/issues/1142
# By setting destroy_subscriptions to False, subscribers are not destroyed.
# While this may lead to memory/performance issues, it's preferable to
# preventing node crashes.
self._last_msg: Dict[str, Tuple[float, Any]] = {}
self._subscriptions: Dict[str, rclpy.node.Subscription] = {}
self._destroy_subscribers: bool = destroy_subscribers
self.node = node
self.subscriptions: Dict[str, Subscription] = {}
self.publishers: Dict[str, Publisher] = {}
self._subscriptions: Dict[str, Subscription] = {}
self._publishers: Dict[str, Publisher] = {}

def subscriber_exists(self, topic: str) -> bool:
return topic in self.subscriptions
return topic in self._subscriptions

def publisher_exists(self, topic: str) -> bool:
return topic in self.publishers
return topic in self._publishers

def create_subscriber(
self,
Expand All @@ -102,7 +100,7 @@ def create_subscriber(
subscription = self.node.create_subscription(
topic=topic, msg_type=msg_cls, callback=callback, qos_profile=qos
)
self.subscriptions[topic] = subscription
self._subscriptions[topic] = subscription
return subscription

def create_publisher(
Expand All @@ -124,7 +122,7 @@ def create_publisher(
publisher = self.node.create_publisher(
topic=topic, msg_type=msg_cls, qos_profile=qos
)
self.publishers[topic] = publisher
self._publishers[topic] = publisher
return publisher

def get_topic_names_and_types(
Expand Down Expand Up @@ -244,6 +242,11 @@ def _verify_publisher_exists(self, topic: str) -> List[TopicEndpointInfo]:
return topic_endpoints

def shutdown(self) -> None:
"""Cleanup publishers when object is destroyed."""
"""Cleanup publishers and subscribers when object is destroyed."""
for publisher in self._publishers.values():
publisher.destroy()
self._publishers.clear()

for subscription in self._subscriptions.values():
subscription.destroy()
self._subscriptions.clear()
13 changes: 10 additions & 3 deletions src/rai_core/rai/communication/ros2/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ def __init__(
If an invalid executor type is provided.
"""
super().__init__()
self._owns_rclpy_context: bool = False
if node_name is None:
node_name = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}"
if not rclpy.ok():
rclpy.init()
self._owns_rclpy_context = True
self.logger.warning(
"Auto-initializing ROS2, but manual initialization is recommended. "
"For better control and predictability, call rclpy.init() or ROS2Context before creating this connector."
Expand Down Expand Up @@ -464,11 +466,12 @@ def get_transform(
raise LookupException(
f"Could not find transform from {source_frame} to {target_frame} in {timeout_sec} seconds"
)
seconds, nanoseconds = divmod(timeout_sec * 1e9, 1e9)
transform: TransformStamped = self._tf_buffer.lookup_transform(
target_frame,
source_frame,
rclpy.time.Time(),
timeout=Duration(seconds=int(timeout_sec)),
timeout=Duration(seconds=int(seconds), nanoseconds=int(nanoseconds)),
)

return transform
Expand Down Expand Up @@ -563,10 +566,14 @@ def shutdown(self):
4. Shuts down the topic API
5. Shuts down the executor
6. Joins the executor thread
7. Shuts down ROS2 context only if this connector initialized it
"""
self._tf_listener.unregister()
self._node.destroy_node()
self._actions_api.shutdown()
self._topic_api.shutdown()
self._service_api.shutdown()
self._actions_api.shutdown()
self._node.destroy_node()
self._executor.shutdown()
self._thread.join()
if self._owns_rclpy_context:
rclpy.shutdown()
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading