AutoAPMS
Resilient Robot Mission Management
Loading...
Searching...
No Matches
ros_subscriber_node.hpp
1// Copyright 2024 Robin Müller
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#pragma once
16
17#include <boost/signals2.hpp>
18#include <memory>
19#include <string>
20
21#include "auto_apms_behavior_tree_core/exceptions.hpp"
22#include "auto_apms_behavior_tree_core/node/ros_node_context.hpp"
23#include "auto_apms_util/string.hpp"
24#include "behaviortree_cpp/condition_node.h"
25#include "rclcpp/executors.hpp"
26#include "rclcpp/qos.hpp"
27
29{
30
58template <class MessageT>
59class RosSubscriberNode : public BT::ConditionNode
60{
61 using Subscriber = typename rclcpp::Subscription<MessageT>;
62
63 struct SubscriberInstance
64 {
65 SubscriberInstance(
66 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group, const std::string & topic_name,
67 const rclcpp::QoS & qos);
68
69 std::shared_ptr<Subscriber> subscriber;
70 boost::signals2::signal<void(const std::shared_ptr<MessageT>)> broadcaster;
71 std::shared_ptr<MessageT> last_msg;
72 std::string name;
73 };
74
75 using SubscribersRegistry = std::unordered_map<std::string, std::weak_ptr<SubscriberInstance>>;
76
77public:
78 using MessageType = MessageT;
79 using Config = BT::NodeConfig;
80 using Context = RosNodeContext;
81
93 const std::string & instance_name, const Config & config, Context context, const rclcpp::QoS & qos = {10});
94
95 virtual ~RosSubscriberNode() { signal_connection_.disconnect(); }
96
104 static BT::PortsList providedBasicPorts(BT::PortsList addition)
105 {
106 BT::PortsList basic = {BT::InputPort<std::string>("port", "Name of the ROS 2 topic to subscribe to.")};
107 basic.insert(addition.begin(), addition.end());
108 return basic;
109 }
110
115 static BT::PortsList providedPorts() { return providedBasicPorts({}); }
116
127 virtual BT::NodeStatus onTick(const std::shared_ptr<MessageT> & last_msg_ptr);
128
138 virtual BT::NodeStatus onMessageReceived(const MessageT & msg);
139
145 bool createSubscriber(const std::string & topic_name);
146
151 std::string getTopicName() const;
152
153protected:
154 const Context context_;
155 const rclcpp::Logger logger_;
156
157private:
158 BT::NodeStatus tick() override final;
159
160 static std::mutex & registryMutex();
161
162 // contains the fully-qualified name of the node and the name of the topic
163 static SubscribersRegistry & getRegistry();
164
165 const rclcpp::QoS qos_;
166 std::string topic_name_;
167 bool dynamic_client_instance_ = false;
168 std::shared_ptr<SubscriberInstance> sub_instance_;
169 std::shared_ptr<MessageT> last_msg_;
170 boost::signals2::connection signal_connection_;
171 std::string subscriber_key_;
172};
173
174// #####################################################################################################################
175// ################################ DEFINITIONS ##############################################
176// #####################################################################################################################
177
178template <class MessageT>
179inline RosSubscriberNode<MessageT>::SubscriberInstance::SubscriberInstance(
180 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group, const std::string & topic_name,
181 const rclcpp::QoS & qos)
182{
183 rclcpp::SubscriptionOptions option;
184 option.callback_group = group;
185
186 // The callback will broadcast to all the instances of RosSubscriberNode<MessageT>
187 auto callback = [this](const std::shared_ptr<MessageT> msg) {
188 this->last_msg = msg;
189 this->broadcaster(msg);
190 };
191 subscriber = node->create_subscription<MessageT>(topic_name, qos, callback, option);
192 name = topic_name;
193}
194
195template <class MessageT>
197 const std::string & instance_name, const Config & config, Context context, const rclcpp::QoS & qos)
198: BT::ConditionNode{instance_name, config},
199 context_{context},
200 logger_(context.getChildLogger(auto_apms_util::toSnakeCase(instance_name))),
201 qos_{qos}
202{
203 if (const BT::Expected<std::string> expected_name = context_.getCommunicationPortName(this)) {
204 createSubscriber(expected_name.value());
205 } else {
206 // We assume that determining the communication port requires a blackboard pointer, which cannot be evaluated at
207 // construction time. The expression will be evaluated each time before the node is ticked the first time after
208 // successful execution.
209 dynamic_client_instance_ = true;
210 }
211}
212
213template <class MessageT>
214inline bool RosSubscriberNode<MessageT>::createSubscriber(const std::string & topic_name)
215{
216 if (topic_name.empty()) {
217 throw exceptions::RosNodeError(
218 context_.getFullyQualifiedTreeNodeName(this) + " - Argument topic_name is empty when trying to create a client.");
219 }
220
221 // Check if the subscriber with given name is already set up
222 if (sub_instance_ && topic_name == sub_instance_->name) return true;
223
224 std::unique_lock lk(registryMutex());
225
226 rclcpp::Node::SharedPtr node = context_.nh_.lock();
227 if (!node) {
228 throw exceptions::RosNodeError(
229 context_.getFullyQualifiedTreeNodeName(this) +
230 " - The weak pointer to the ROS 2 node expired. The tree node doesn't "
231 "take ownership of it.");
232 }
233 rclcpp::CallbackGroup::SharedPtr group = context_.cb_group_.lock();
234 if (!group) {
235 throw exceptions::RosNodeError(
236 context_.getFullyQualifiedTreeNodeName(this) +
237 " - The weak pointer to the ROS 2 callback group expired. The tree node doesn't "
238 "take ownership of it.");
239 }
240 subscriber_key_ = std::string(node->get_fully_qualified_name()) + "/" + topic_name;
241
242 auto & registry = getRegistry();
243 auto it = registry.find(subscriber_key_);
244 if (it == registry.end() || it->second.expired()) {
245 sub_instance_ = std::make_shared<SubscriberInstance>(node, group, topic_name, qos_);
246 registry.insert({subscriber_key_, sub_instance_});
247 RCLCPP_DEBUG(
248 logger_, "%s - Created subscriber for topic '%s'.", context_.getFullyQualifiedTreeNodeName(this).c_str(),
249 topic_name.c_str());
250 } else {
251 sub_instance_ = it->second.lock();
252 }
253
254 // Check if there was a message received before the creation of this subscriber action
255 if (sub_instance_->last_msg) {
256 last_msg_ = sub_instance_->last_msg;
257 }
258
259 // add "this" as received of the broadcaster
260 signal_connection_ =
261 sub_instance_->broadcaster.connect([this](const std::shared_ptr<MessageT> msg) { last_msg_ = msg; });
262
263 return true;
264}
265
266template <class MessageT>
267inline BT::NodeStatus RosSubscriberNode<MessageT>::tick()
268{
269 if (!rclcpp::ok()) {
270 halt();
271 return BT::NodeStatus::FAILURE;
272 }
273
274 // If client has been set up in derived constructor, event though this constructor couldn't, we discard the intention
275 // of dynamically creating the client
276 if (dynamic_client_instance_ && sub_instance_) {
277 dynamic_client_instance_ = false;
278 }
279
280 // Try again to create the client on first tick if this was not possible during construction or if client should be
281 // created from a blackboard entry on the start of every iteration
282 if (status() == BT::NodeStatus::IDLE && dynamic_client_instance_) {
283 const BT::Expected<std::string> expected_name = context_.getCommunicationPortName(this);
284 if (expected_name) {
285 createSubscriber(expected_name.value());
286 } else {
287 throw exceptions::RosNodeError(
288 context_.getFullyQualifiedTreeNodeName(this) +
289 " - Cannot create the subscriber because the topic name couldn't be resolved using "
290 "the communication port expression specified by the node's "
291 "registration parameters (" +
292 NodeRegistrationOptions::PARAM_NAME_PORT + ": " + context_.registration_options_.port +
293 "). Error message: " + expected_name.error());
294 }
295 }
296
297 if (!sub_instance_) {
298 throw exceptions::RosNodeError(context_.getFullyQualifiedTreeNodeName(this) + " - sub_instance_ is nullptr.");
299 }
300
301 auto check_status = [this](BT::NodeStatus status) {
302 if (!isStatusCompleted(status)) {
303 throw exceptions::RosNodeError(
304 context_.getFullyQualifiedTreeNodeName(this) + " - The callback must return either SUCCESS or FAILURE.");
305 }
306 return status;
307 };
308 auto status = check_status(onTick(last_msg_));
309 last_msg_.reset();
310 return status;
311}
312
313template <class MessageT>
314inline BT::NodeStatus RosSubscriberNode<MessageT>::onTick(const std::shared_ptr<MessageT> & last_msg_ptr)
315{
316 if (!last_msg_ptr) return BT::NodeStatus::FAILURE;
317 return onMessageReceived(*last_msg_ptr);
318}
319
320template <class MessageT>
321inline BT::NodeStatus RosSubscriberNode<MessageT>::onMessageReceived(const MessageT & /*msg*/)
322{
323 return BT::NodeStatus::SUCCESS;
324}
325
326template <class MessageT>
328{
329 if (sub_instance_) return sub_instance_->name;
330 return "unkown";
331}
332
333template <class MessageT>
334inline std::mutex & RosSubscriberNode<MessageT>::registryMutex()
335{
336 static std::mutex sub_mutex;
337 return sub_mutex;
338}
339
340template <class MessageT>
341inline typename RosSubscriberNode<MessageT>::SubscribersRegistry & RosSubscriberNode<MessageT>::getRegistry()
342{
343 static SubscribersRegistry subscribers_registry;
344 return subscribers_registry;
345}
346
347} // namespace auto_apms_behavior_tree::core
Additional parameters specific to ROS 2 determined at runtime by TreeBuilder.
std::string getFullyQualifiedTreeNodeName(const BT::TreeNode *node, bool with_class_name=true) const
Create a string representing the detailed name of a behavior tree node.
Generic behavior tree node wrapper for a ROS 2 subscriber.
RosSubscriberNode(const std::string &instance_name, const Config &config, Context context, const rclcpp::QoS &qos={10})
Constructor.
std::string getTopicName() const
Get the name of the topic name this node subscribes to.
virtual BT::NodeStatus onMessageReceived(const MessageT &msg)
Callback invoked when the node is ticked and a valid message has been received.
static BT::PortsList providedBasicPorts(BT::PortsList addition)
Derived nodes implementing the static method RosSubscriberNode::providedPorts may call this method to...
bool createSubscriber(const std::string &topic_name)
Create the ROS 2 subscriber.
static BT::PortsList providedPorts()
If a behavior tree requires input/output data ports, the developer must define this method accordingl...
virtual BT::NodeStatus onTick(const std::shared_ptr< MessageT > &last_msg_ptr)
Callback invoked when the node is ticked.
Core API for AutoAPMS's behavior tree implementation.
Definition builder.hpp:30
Fundamental helper classes and utility functions.
std::string port
Default port name of the corresponding ROS 2 communication interface.