基础概念

  • 并发(Concurrency)意味着系统或软件可以同时运行许多任务,Concurrency
  • 线程(Thread)是可以由操作系统创建和管理的轻量级执行单元。每个线程都有自己的堆栈,并且可以与其他线程并发执行代码。

请添加图片描述

  • 例如,假设我们有一台单核 CPU 机器和两个并发执行的线程。操作系统将以快速的速度在两个线程之间切换,允许每个线程在切换到另一个线程之前执行一小段时间。此过程将重复,直到两个线程都完成其任务。我们将其称为并发机制。

执行器(Executor)的概念

  • ROS2 中的执行管理是通过执行器(Executor)的概念来解释的
  • Executor使用底层操作系统的一个或多个线程来调用传入消息和事件的订阅、计时器、服务服务器、操作服务器等的回调。
  • 我们来看一个普通的订阅节点
    import rclpy
    from rclpy.node import Node
    
    from std_msgs.msg import String
    
    
    class MinimalPublisher(Node):
    
        def __init__(self):
            super().__init__('minimal_publisher')
            self.publisher_ = self.create_publisher(String, 'topic', 10)
            self.timer = self.create_timer(0.5, self.timer_callback)
    
        def timer_callback(self):
            msg = String()
            msg.data = 'Hello World'
            self.publisher_.publish(msg)
    def main(args=None):
        rclpy.init(args=args)
    
        minimal_publisher = MinimalPublisher()
        rclpy.spin(minimal_publisher)
        minimal_publisher.destroy_node()
        rclpy.shutdown()
    
    
    if __name__ == '__main__':
        main()
    
  • 上述代码中
    rclpy.spin(minimal_publisher)
    
    • 实际上是对单线程执行器的实例化和调用
      executor=SingleThreadedExecutor()
      executor.add_node(minimal_publisher)
      executor.spin() # 和上述代码相同
      

执行者(Executor)的类型

  • 多线程执行器创建可配置数量的线程,以允许并行处理多个消息或事件。

  • 提供了三种 Executor 类型,派生自共享父类:
    请添加图片描述

  • SingleThreadedExecutor:就是默认使用的单线程执行器,同一时间下只能执行一个线程的任务,其他任务需要排队(阻塞式)

  • StaticSingleThreadedExecutor:在订阅、定时器、服务服务器、动作服务器等方面优化了扫描节点结构的运行时成本。它只在添加节点时执行一次扫描,而其他两个执行器定期执行扫描此类更改。因此,StaticSingleThreadedExecutor只能与在初始化期间创建所有订阅、计时器等的节点一起使用。

  • MultiThreadedExecutor:多线程执行器,本文要介绍的重点,但是直接使用它无法起到作用,这里还要引入回调组(Callback groups)的概念

回调组(Callback groups)的概念

  • ROS2 允许按组(groups)组织节点的回调。
  • 在rclpy中,通过调用特定回调组类型的构造函数来完成相同的操作。回调组必须在节点的整个执行过程中存储(例如作为类成员),否则执行器将无法触发回调。然后,可以在创建订阅、计时器等时指定此回调组 - 例如通过订阅选项:
    my_callback_group = MutuallyExclusiveCallbackGroup()
    my_subscription = self.create_subscription(Int32, "/topic", self.callback, qos_profile=1,callback_group=my_callback_group)
    
  • 在平常我们创建订阅、计时器等时没有制定回调组时,订阅、计时器会自动绑定默认的回调组(Node.default_callback_groups)

回调组(Callback groups)的类型

  • MutuallyExclusiveCallbackGroups: 该组的回调不能并行执行。也就同一时刻只有一个线程在运行。(默认的回调组),组中的回调由 SingleThreadedExecutor 执行一样。
    • 属于不同回调组(任何类型)的回调始终可以彼此并行执行。
      self.group1=MutuallyExclusiveCallbackGroup()
      self.group2=MutuallyExclusiveCallbackGroup()
      sub1= self.create_subscription(Int32, "/topic1", self.callback, qos_profile=1,callback_group=self.group1)
      sub2 = self.create_subscription(Int32, "/topic2", self.callback, qos_profile=1,callback_group=self.group2)
      executor=MultiThreadedExecutor(num_threads=3)
    
  • ReentrantCallbackGroups: 该组的回调可以并行执行。允许执行器以任何它认为合适的方式安排和执行组的回调,而没有任何限制。这意味着,除了不同的回调彼此并行运行之外,同一回调的不同实例也可以同时执行。
      self.group1=ReentrantCallbackGroups()
      sub1= self.create_subscription(Int32, "/topic1", self.callback, qos_profile=1,callback_group=self.group1)
      sub2 = self.create_subscription(Int32, "/topic2", self.callback, qos_profile=1,callback_group=self.group1)
      executor=MultiThreadedExecutor(num_threads=3)
    

历程

  • import rclpy
    from rclpy.node import Node
    from rclpy.callback_groups import ReentrantCallbackGroup
    from rclpy.executors import MultiThreadedExecutor
    class Test(Node):
       def __init__(self):
          super().__init__('test')
          self.group=ReentrantCallbackGroup()
          self.timer1=self.create_timer(0.1,self.timer1CB,callback_group=self.group)
          self.timer2=self.create_timer(1,self.timer2CB,callback_group=self.group)
          self.i=0
          self.executor=MultiThreadedExecutor(num_threads=3)
       def timer2CB(self):
          self.i+=1
       def timer1CB(self):
          print(self.i)
    def main(args=None):
      rclpy.init(args=args)
      test=Test()
      rclpy.spin(test)
      test.destory_node()
      rclpy.shutdown()
    if __name__=='__main__':
      main()
       ```
    

预告

*下期讲讲ROS2C++的多线程使用

参考文章

  • https://docs.ros.org/en/foxy/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Py-Publisher-And-Subscriber.html
  • https://docs.ros2.org/latest/api/rclpy/api/execution_and_callbacks.html
  • https://medium.com/@nullbyte.in/ros2-from-the-ground-up-part-5-concurrency-executors-and-callback-groups-c45900973fd2
  • https://docs.ros.org/en/foxy/Concepts/About-Executors.html
  • https://docs.ros.org/en/foxy/How-To-Guides/Using-callback-groups.html
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐