AutoAPMS
Resilient Robot Mission Management
Loading...
Searching...
No Matches
ros_subscriber_node.hpp
Go to the documentation of this file.
1// Copyright 2024 Robin Müller
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#pragma once
16
17#include <boost/signals2.hpp>
18#include <memory>
19#include <string>
20
21#include "rclcpp/qos.hpp"
22#include "rclcpp/executors.hpp"
25#include "behaviortree_cpp/condition_node.h"
26
28{
29
44template <class MessageT>
45class RosSubscriberNode : public BT::ConditionNode
46{
47 using Subscriber = typename rclcpp::Subscription<MessageT>;
48
49 struct SubscriberInstance
50 {
51 SubscriberInstance(std::shared_ptr<rclcpp::Node> node, const std::string& topic_name, const rclcpp::QoS& qos);
52
53 std::shared_ptr<Subscriber> subscriber;
54 rclcpp::CallbackGroup::SharedPtr callback_group;
55 rclcpp::executors::SingleThreadedExecutor callback_group_executor;
56 boost::signals2::signal<void(const std::shared_ptr<MessageT>)> broadcaster;
57 std::shared_ptr<MessageT> last_msg;
58 };
59
60 using SubscribersRegistry = std::unordered_map<std::string, std::weak_ptr<SubscriberInstance>>;
61
62public:
63 using MessageType = MessageT;
64 using Config = BT::NodeConfig;
66
67 explicit RosSubscriberNode(const std::string& instance_name, const Config& config, const Context& context,
68 const rclcpp::QoS& qos = { 10 });
69
71 {
72 signal_connection_.disconnect();
73 }
74
81 static BT::PortsList providedBasicPorts(BT::PortsList addition)
82 {
83 BT::PortsList basic = { BT::InputPort<std::string>("topic_name", "", "Topic name") };
84 basic.insert(addition.begin(), addition.end());
85 return basic;
86 }
87
92 static BT::PortsList providedPorts()
93 {
94 return providedBasicPorts({});
95 }
96
97 BT::NodeStatus tick() override final;
98
109 virtual BT::NodeStatus onTick(const std::shared_ptr<MessageT>& last_msg_ptr);
110
120 virtual BT::NodeStatus onMessageReceived(const MessageT& msg);
121
122 std::string getFullName() const;
123
124protected:
126
127 static std::mutex& registryMutex();
128
129 // contains the fully-qualified name of the node and the name of the topic
130 static SubscribersRegistry& getRegistry();
131
132 const rclcpp::Logger logger_;
133
134private:
135 const Context context_;
136 const rclcpp::QoS qos_;
137 std::string topic_name_;
138 bool topic_name_should_be_checked_ = false;
139 std::shared_ptr<SubscriberInstance> sub_instance_;
140 std::shared_ptr<MessageT> last_msg_;
141 boost::signals2::connection signal_connection_;
142 std::string subscriber_key_;
143
144 bool createSubscriber(const std::string& topic_name);
145};
146
147//----------------------------------------------------------------
148//---------------------- DEFINITIONS -----------------------------
149//----------------------------------------------------------------
150
151template <class MessageT>
152inline RosSubscriberNode<MessageT>::SubscriberInstance::SubscriberInstance(std::shared_ptr<rclcpp::Node> node,
153 const std::string& topic_name,
154 const rclcpp::QoS& qos)
155{
156 // create a callback group for this particular instance
157 callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
158 callback_group_executor.add_callback_group(callback_group, node->get_node_base_interface());
159
160 rclcpp::SubscriptionOptions option;
161 option.callback_group = callback_group;
162
163 // The callback will broadcast to all the instances of RosSubscriberNode<MessageT>
164 auto callback = [this](const std::shared_ptr<MessageT> msg) {
165 last_msg = msg;
166 broadcaster(msg);
167 };
168 subscriber = node->create_subscription<MessageT>(topic_name, qos, callback, option);
169}
170
171template <class MessageT>
172inline RosSubscriberNode<MessageT>::RosSubscriberNode(const std::string& instance_name, const Config& config,
173 const Context& context, const rclcpp::QoS& qos)
174 : BT::ConditionNode{ instance_name, config }, logger_(context.getLogger()), context_{ context }, qos_{ qos }
175{
176 // check port remapping
177 auto portIt = config.input_ports.find("topic_name");
178 if (portIt != config.input_ports.end())
179 {
180 const std::string& bb_topic_name = portIt->second;
181
182 if (isBlackboardPointer(bb_topic_name))
183 {
184 // unknown value at construction time. postpone to tick
185 topic_name_should_be_checked_ = true;
186 }
187 else if (!bb_topic_name.empty())
188 {
189 // "hard-coded" name in the bb_topic_name. Use it.
190 createSubscriber(bb_topic_name);
191 }
192 }
193 // no port value or it is empty. Use the default port value
194 if (!sub_instance_ && !context.default_port_name.empty())
195 {
196 createSubscriber(context.default_port_name);
197 }
198}
199
200template <class MessageT>
201inline bool RosSubscriberNode<MessageT>::createSubscriber(const std::string& topic_name)
202{
203 if (topic_name.empty())
204 {
205 throw exceptions::RosNodeError(getFullName() + " - Argument topic_name is empty when trying to create a client.");
206 }
207 if (sub_instance_)
208 {
209 throw exceptions::RosNodeError(getFullName() + " - Cannot call createSubscriber() more than once.");
210 }
211
212 // find SubscriberInstance in the registry
213 std::unique_lock lk(registryMutex());
214
215 auto node = getRosContext().nh.lock();
216 if (!node)
217 {
218 throw exceptions::RosNodeError(getFullName() +
219 " - The shared pointer to the ROS node went out of scope. The tree node doesn't "
220 "take the ownership of the node.");
221 }
222 subscriber_key_ = std::string(node->get_fully_qualified_name()) + "/" + topic_name;
223
224 auto& registry = getRegistry();
225 auto it = registry.find(subscriber_key_);
226 if (it == registry.end() || it->second.expired())
227 {
228 sub_instance_ = std::make_shared<SubscriberInstance>(node, topic_name, qos_);
229 registry.insert({ subscriber_key_, sub_instance_ });
230 RCLCPP_DEBUG(logger_, "%s - Created subscriber for topic '%s'.", getFullName().c_str(), topic_name.c_str());
231 }
232 else
233 {
234 sub_instance_ = it->second.lock();
235 }
236
237 // Check if there was a message received before the creation of this subscriber action
238 if (sub_instance_->last_msg)
239 {
240 last_msg_ = sub_instance_->last_msg;
241 }
242
243 // add "this" as received of the broadcaster
244 signal_connection_ =
245 sub_instance_->broadcaster.connect([this](const std::shared_ptr<MessageT> msg) { last_msg_ = msg; });
246
247 topic_name_ = topic_name;
248 return true;
249}
250
251template <class MessageT>
253{
254 // First, check if the subscriber_ is valid and that the name of the
255 // topic_name in the port didn't change.
256 // otherwise, create a new subscriber
257 if (!sub_instance_ || (status() == BT::NodeStatus::IDLE && topic_name_should_be_checked_))
258 {
259 std::string topic_name;
260 getInput("topic_name", topic_name);
261 if (topic_name_ != topic_name)
262 {
263 createSubscriber(topic_name);
264 }
265 }
266
267 if (!sub_instance_)
268 {
269 throw exceptions::RosNodeError(getFullName() +
270 " - You must specify a service name either by using a default value or by "
271 "passing a value to the corresponding dynamic input port.");
272 }
273
274 auto check_status = [this](BT::NodeStatus status) {
275 if (!isStatusCompleted(status))
276 {
277 throw exceptions::RosNodeError(getFullName() + " - The callback must return either SUCCESS or FAILURE.");
278 }
279 return status;
280 };
281 sub_instance_->callback_group_executor.spin_some();
282 auto status = check_status(onTick(last_msg_));
283 last_msg_.reset();
284 return status;
285}
286
287template <class MessageT>
288inline BT::NodeStatus RosSubscriberNode<MessageT>::onTick(const std::shared_ptr<MessageT>& last_msg_ptr)
289{
290 if (!last_msg_ptr)
291 return BT::NodeStatus::FAILURE;
292 return onMessageReceived(*last_msg_ptr);
293}
294
295template <class MessageT>
296inline BT::NodeStatus RosSubscriberNode<MessageT>::onMessageReceived(const MessageT& /*msg*/)
297{
298 return BT::NodeStatus::SUCCESS;
299}
300
301template <class MessageT>
303{
304 // NOTE: registrationName() is empty during construction as this member is frist set after the factory constructed the
305 // object
306 if (registrationName().empty())
307 return name();
308 if (this->name() == this->registrationName())
309 return this->name();
310 return this->name() + " (Type: " + this->registrationName() + ")";
311}
312
313template <class MessageT>
315{
316 return context_;
317}
318
319template <class MessageT>
321{
322 static std::mutex sub_mutex;
323 return sub_mutex;
324}
325
326template <class MessageT>
327inline typename RosSubscriberNode<MessageT>::SubscribersRegistry& RosSubscriberNode<MessageT>::getRegistry()
328{
329 static SubscribersRegistry subscribers_registry;
330 return subscribers_registry;
331}
332
333} // namespace auto_apms_behavior_tree
Abstract class to wrap a Topic subscriber. Considering the example in the tutorial: https://docs....
virtual BT::NodeStatus onMessageReceived(const MessageT &msg)
Callback invoked when a new message is received. You must return either SUCCESS or FAILURE.
static BT::PortsList providedBasicPorts(BT::PortsList addition)
Any subclass of RosTopicNode that accepts parameters must provide a providedPorts method and call pro...
RosSubscriberNode(const std::string &instance_name, const Config &config, const Context &context, const rclcpp::QoS &qos={ 10 })
static BT::PortsList providedPorts()
Creates list of BT ports.
virtual BT::NodeStatus onTick(const std::shared_ptr< MessageT > &last_msg_ptr)
Callback invoked on every tick. You must return either SUCCESS or FAILURE.
std::string default_port_name
Default port name of the corresponding ROS 2 communication interface.