Multithreading behaviour with ROS AsyncSpinner

nebulant picture nebulant · Jan 29, 2018 · Viewed 8.5k times · Source

I am trying to understand how the AsyncSpinner from ROS really works because I may have something misunderstood. You can find a similar question here.

As seen here its definition mentions:

Asynchronous spinner: spawns a couple of threads (configurable) that will execute callbacks in parallel while not blocking the thread that called it. The start/stop method allows to control when the callbacks start being processed and when it should stop.

And in the official documentation here the AsyncSpinning is also remarked as a type of multi-threading Spinning.

So, said that, I have a very simple example with a publisher and subscriber with an AsyncSpinner to test the multi-threading behavior.

#include "ros/ros.h"
#include "std_msgs/String.h"

int main(int argc, char **argv)
{
  ros::init(argc, argv, "publisher");
  ros::NodeHandle nh;

  ros::Publisher chatter_pub = nh.advertise<std_msgs::String>("chatter", 1000);

  ros::Rate loop_rate(10);
  while (ros::ok())
  {
    std_msgs::String msg;
    msg.data = "hello world";

    chatter_pub.publish(msg);

    ros::spinOnce();

    loop_rate.sleep();
  }

  return 0;
}

And the subscriber where the spinner is defined and used:

#include "ros/ros.h"
#include "std_msgs/String.h"
#include <boost/thread.hpp>

int count = 0;

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  count++;
  ROS_INFO("Subscriber %i callback: I heard %s", count, msg->data.c_str());
  sleep(1);
}

int main(int argc, char **argv)
{
  ros::init(argc, argv, "subscriber");
  ros::NodeHandle nh;

  ros::Subscriber sub = nh.subscribe("chatter", 1000, chatterCallback);

  ros::AsyncSpinner spinner(boost::thread::hardware_concurrency());
  ros::Rate r(10);

  spinner.start();
  ros::waitForShutdown();

  return 0;
}

When I run both programs I get the following output:

[ INFO] [1517215527.481856914]: Subscriber 1 callback: I heard hello world
[ INFO] [1517215528.482005146]: Subscriber 2 callback: I heard hello world
[ INFO] [1517215529.482204798]: Subscriber 3 callback: I heard hello world

As you can see the callback runs every second and no other callbacks are being called in parallel. I know that the global callback queue is being fulfilled because if I stop the publisher, the subscriber will keep popping the accumulated messages from the queue.

I know I should not block a callback but in the definition above is remarked that this will not stop the thread where it was called and I guess neither the others created by the spinner. Am I blocking the next callbacks just because I'm blocking the callback? Is there something I did misunderstood? I am bit confused and not able to demonstrate that the callbacks are running in parallel. Maybe you have another example?

Answer

Fruchtzwerg picture Fruchtzwerg · Jan 31, 2018

Short answer:

ROS callbacks are threadsafe by default. This means a registered callback can only be processed by one thread and concurrent calls are disabled. A second thread is not able to access the same callback at the same time.

If you register a second callback, you will see the spinner working like expected and multiple threads are calling your callbacks at the same time.

ros::Subscriber sub1 = nh.subscribe("chatter", 1000, chatterCallback);
ros::Subscriber sub2 = nh.subscribe("chatter", 1000, chatterCallback);

Extended answer:

An async spinner tries to call available callbacks in the callback queue as fast as the rate allows. If the callback is already in process (by an other thread) the CallResult is TryAgain. This means a new attempt will be started later on.

The implementation of this lock uses the variable allow_concurrent_callbacks_ which means this behaviour is optional.

Solution:

It is possible to allow concurrent calls by setting the correct SubscribeOptions.allow_concurrent_callbacks which is false by default. Therefore you need to define your own SubscribeOptions. Here is the code you need to subscribe and allow concurrent callback calls:

ros::SubscribeOptions ops;
ops.template init<std_msgs::String>("chatter", 1000, chatterCallback);
ops.transport_hints = ros::TransportHints();
ops.allow_concurrent_callbacks = true;
ros::Subscriber sub = nh.subscribe(ops);