当前位置: 首页 > news >正文

【ROS2】中级-编写动作服务器和客户端(Python)

目标:用 Python 实现一个动作服务器和客户端。

教程级别:中级

 时间:15 分钟

 目录

  •  背景

  •  先决条件

  •  任务

    • 1. 编写动作服务器

    • 2. 编写动作客户端

  •  摘要

  •  相关内容

 背景

动作是 ROS 2 中异步通信的一种形式。动作客户端向动作服务器发送目标请求。动作服务器向动作客户端发送目标反馈和结果。

 先决条件

您将需要 custom_action_interfaces 包和在上一教程中定义的 Fibonacci.action 接口,创建一个操作。

 任务

1. 编写动作服务器

让我们专注于编写一个动作服务器,使用我们在创建动作教程中创建的动作来计算斐波那契序列。

直到现在,您已经创建了包并使用 ros2 run 来运行您的节点。然而,为了在本教程中保持简单,我们将把动作服务器限定在一个文件中。如果您想看看完整的动作教程包是什么样子,请查看 action_tutorials https://github.com/ros2/demos/tree/jazzy/action_tutorials 。

在您的home目录中打开一个新文件,我们称它为 fibonacci_action_server.py ,并添加以下代码:

import rclpy  # 导入ROS2的Python库
from rclpy.action import ActionServer  # 从rclpy.action模块导入ActionServer类
from rclpy.node import Node  # 从rclpy.node模块导入Node类from custom_action_interfaces.action import Fibonacci  # 导入自定义的Fibonacci动作接口class FibonacciActionServer(Node):  # 定义一个名为FibonacciActionServer的类,该类继承自Node类def __init__(self):  # 类的初始化函数super().__init__('fibonacci_action_server')  # 调用父类的初始化函数,创建一个名为'fibonacci_action_server'的节点self._action_server = ActionServer(  # 创建一个动作服务器self,  # 传入当前节点实例Fibonacci,  # 指定动作接口为Fibonacci'fibonacci',  # 动作的名字为'fibonacci'self.execute_callback)  # 当收到动作目标时,调用的回调函数为execute_callbackdef execute_callback(self, goal_handle):  # 定义动作执行的回调函数self.get_logger().info('Executing goal...')  # 打印日志信息result = Fibonacci.Result()  # 创建一个Fibonacci动作的结果实例return result  # 返回结果实例def main(args=None):  # 定义主函数rclpy.init(args=args)  # 初始化ROS2的Python库fibonacci_action_server = FibonacciActionServer()  # 创建一个FibonacciActionServer的实例rclpy.spin(fibonacci_action_server)  # 使fibonacci_action_server节点保持活动状态,直到节点被显式关闭或按下Ctrl+Cif __name__ == '__main__':  # 如果当前脚本被直接运行,而不是被导入为模块运行main()  # 调用主函数

第 8 行定义了一个类 FibonacciActionServer ,它是 Node 的子类。该类通过调用 Node 构造函数来初始化,将我们的节点命名为 fibonacci_action_server :

super().__init__('fibonacci_action_server')

在构造函数中,我们还实例化了一个新的动作服务器:

self._action_server = ActionServer(self,Fibonacci,'fibonacci',self.execute_callback)

一个动作服务器需要四个参数:

  1. 一个 ROS 2 节点用于添加动作客户端到: self 。

  2. 动作类型: Fibonacci (在第 5 行导入)。

  3. 动作名称: 'fibonacci' 。

  4. 用于执行已接受目标的回调函数: self.execute_callback 。此回调函数必须为动作类型返回一个结果消息。

我们还在我们的类中定义了一个 execute_callback 方法:

def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')result = Fibonacci.Result()return result

这是一旦目标被接受后将被调用来执行的方法。

让我们尝试运行我们的动作服务器:

python3 fibonacci_action_server.py

在另一个终端,我们可以使用命令行界面来发送一个目标:

ros2 action send_goal fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"

在运行动作服务器的终端中,您应该会看到一条记录消息“正在执行目标...”,随后是一个警告,提示目标状态未设置。默认情况下,如果在执行回调中未设置目标句柄状态,它会假定为中止状态。

我们可以在目标句柄上调用 succeed() 来表示目标已成功实现:

def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')goal_handle.succeed()result = Fibonacci.Result()return result

如果您现在重启动作服务器并发送另一个目标,您应该会看到目标以状态 SUCCEEDED 完成。

现在,让我们的目标执行实际计算并返回请求的斐波那契序列:

def execute_callback(self, goal_handle):self.get_logger().info('Executing goal...')sequence = [0, 1]for i in range(1, goal_handle.request.order):sequence.append(sequence[i] + sequence[i-1])goal_handle.succeed()result = Fibonacci.Result()result.sequence = sequencereturn result

计算序列后,我们在返回之前将其分配给结果消息字段。

再次重启动作服务器并发送另一个目标。你应该会看到目标以正确的结果序列结束。

1.2 发布反馈

动作的一个好处是在目标执行期间能够向动作客户端提供反馈。我们可以通过调用目标句柄的 publish_feedback() 方法,使我们的动作服务器为动作客户端发布反馈。

我们将替换 sequence 变量,并使用反馈消息来存储序列。在 for 循环中每次更新反馈消息后,我们发布反馈消息并暂停以产生戏剧性效果:

import time  # 导入Python的时间库import rclpy  # 导入ROS2的Python库
from rclpy.action import ActionServer  # 从rclpy.action模块导入ActionServer类
from rclpy.node import Node  # 从rclpy.node模块导入Node类from custom_action_interfaces.action import Fibonacci  # 导入自定义的Fibonacci动作接口class FibonacciActionServer(Node):  # 定义一个名为FibonacciActionServer的类,该类继承自Node类def __init__(self):  # 类的初始化函数super().__init__('fibonacci_action_server')  # 调用父类的初始化函数,创建一个名为'fibonacci_action_server'的节点self._action_server = ActionServer(  # 创建一个动作服务器self,  # 传入当前节点实例Fibonacci,  # 指定动作接口为Fibonacci'fibonacci',  # 动作的名字为'fibonacci'self.execute_callback)  # 当收到动作目标时,调用的回调函数为execute_callbackdef execute_callback(self, goal_handle):  # 定义动作执行的回调函数self.get_logger().info('Executing goal...')  # 打印日志信息feedback_msg = Fibonacci.Feedback()  # 创建一个Fibonacci动作的反馈信息实例feedback_msg.partial_sequence = [0, 1]  # 初始化反馈信息的部分序列为[0, 1]for i in range(1, goal_handle.request.order):  # 对于目标请求的序列长度进行循环feedback_msg.partial_sequence.append(  # 在部分序列后面添加新的元素feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])  # 新的元素为部分序列的最后两个元素之和self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))  # 打印反馈信息的部分序列goal_handle.publish_feedback(feedback_msg)  # 发布反馈信息time.sleep(1)  # 暂停1秒goal_handle.succeed()  # 动作目标执行成功result = Fibonacci.Result()  # 创建一个Fibonacci动作的结果实例result.sequence = feedback_msg.partial_sequence  # 将反馈信息的部分序列赋值给结果的序列return result  # 返回结果实例def main(args=None):  # 定义主函数rclpy.init(args=args)  # 初始化ROS2的Python库fibonacci_action_server = FibonacciActionServer()  # 创建一个FibonacciActionServer的实例rclpy.spin(fibonacci_action_server)  # 使fibonacci_action_server节点保持活动状态,直到节点被显式关闭或按下Ctrl+Cif __name__ == '__main__':  # 如果当前脚本被直接运行,而不是被导入为模块运行main()  # 调用主函数

重启动作服务器后,我们可以通过使用带有 --feedback 选项的命令行工具来确认现在已发布反馈:

ros2 action send_goal --feedback fibonacci custom_action_interfaces/action/Fibonacci "{order: 5}"

2. 编写动作客户端

我们还将将动作客户端限定在单个文件中。打开一个新文件,我们称之为 fibonacci_action_client.py ,并添加以下样板代码:

import rclpy  # 导入ROS2的Python库
from rclpy.action import ActionClient  # 从rclpy.action模块导入ActionClient类
from rclpy.node import Node  # 从rclpy.node模块导入Node类from custom_action_interfaces.action import Fibonacci  # 导入自定义的Fibonacci动作接口class FibonacciActionClient(Node):  # 定义一个名为FibonacciActionClient的类,该类继承自Node类def __init__(self):  # 类的初始化函数super().__init__('fibonacci_action_client')  # 调用父类的初始化函数,创建一个名为'fibonacci_action_client'的节点self._action_client = ActionClient(self, Fibonacci, 'fibonacci')  # 创建一个动作客户端,动作接口为Fibonacci,动作的名字为'fibonacci'def send_goal(self, order):  # 定义发送动作目标的函数goal_msg = Fibonacci.Goal()  # 创建一个Fibonacci动作的目标实例goal_msg.order = order  # 设置目标的序列长度为函数的参数self._action_client.wait_for_server()  # 等待动作服务器return self._action_client.send_goal_async(goal_msg)  # 异步发送动作目标,并返回结果的Future对象def main(args=None):  # 定义主函数rclpy.init(args=args)  # 初始化ROS2的Python库action_client = FibonacciActionClient()  # 创建一个FibonacciActionClient的实例future = action_client.send_goal(10)  # 发送动作目标,序列长度为10,返回结果的Future对象rclpy.spin_until_future_complete(action_client, future)  # 使action_client节点保持活动状态,直到Future对象完成if __name__ == '__main__':  # 如果当前脚本被直接运行,而不是被导入为模块运行main()  # 调用主函数

我们定义了一个类 FibonacciActionClient ,它是 Node 的子类。该类通过调用 Node 构造函数来初始化,将我们的节点命名为 fibonacci_action_client :

super().__init__('fibonacci_action_client')

在类构造函数中,我们还使用上一教程中的自定义动作定义创建了一个动作客户端:

self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

我们通过传递三个参数来创建一个 ActionClient

  1. 一个 ROS 2 节点用于添加动作客户端到: self

  2. 动作的类型: Fibonacci

  3.  动作名称: 'fibonacci'

我们的动作客户端将能够与具有相同动作名称和类型的动作服务器进行通信。

我们还在 FibonacciActionClient 类中定义了一个方法 send_goal :

def send_goal(self, order):goal_msg = Fibonacci.Goal()goal_msg.order = orderself._action_client.wait_for_server()return self._action_client.send_goal_async(goal_msg)

此方法等待动作服务器可用,然后向服务器发送一个目标。它返回一个我们稍后可以等待的未来。

在类定义之后,我们定义了一个函数 main() ,它初始化 ROS 2 并创建我们的 FibonacciActionClient 节点的实例。然后它发送一个目标,并等待直到该目标完成。

最后,我们在 Python 程序的入口点调用 main() 。

让我们通过首先运行之前构建的动作服务器来测试我们的动作客户端:

python3 fibonacci_action_server.py

在另一个终端中,运行动作客户端:

python3 fibonacci_action_client.py

您应该会看到动作服务器在成功执行目标时打印的消息:

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

动作客户端应该启动,然后迅速完成。在这一点上,我们有一个功能正常的动作客户端,但我们看不到任何结果,也没有得到任何反馈。

2.1 获得结果

所以我们可以发送一个目标,但我们如何知道它何时完成?我们可以通过几个步骤获取结果信息。首先,我们需要为我们发送的目标获取一个目标句柄。然后,我们可以使用目标句柄来请求结果

这是此示例的完整代码:

# 导入 ROS 客户端库
import rclpy
# 导入 ROS 动作客户端
from rclpy.action import ActionClient
# 导入 ROS 节点
from rclpy.node import Node# 导入自定义的 Fibonacci 动作
from custom_action_interfaces.action import Fibonacci# 定义 Fibonacci 动作客户端类,继承自 Node
class FibonacciActionClient(Node):# 初始化函数def __init__(self):# 调用父类初始化函数,设置节点名为 'fibonacci_action_client'super().__init__('fibonacci_action_client')# 创建动作客户端,动作类型为 Fibonacci,动作名为 'fibonacci'self._action_client = ActionClient(self, Fibonacci, 'fibonacci')# 发送目标函数def send_goal(self, order):# 创建目标消息goal_msg = Fibonacci.Goal()# 设置目标消息的 order 字段goal_msg.order = order# 等待动作服务器self._action_client.wait_for_server()# 异步发送目标,并获取 future 对象self._send_goal_future = self._action_client.send_goal_async(goal_msg)# 为 future 对象添加完成回调self._send_goal_future.add_done_callback(self.goal_response_callback)# 目标响应回调函数def goal_response_callback(self, future):# 获取目标句柄goal_handle = future.result()# 如果目标未被接受if not goal_handle.accepted:# 记录信息:目标被拒绝self.get_logger().info('目标被拒绝 :(')return# 记录信息:目标被接受self.get_logger().info('目标被接受 :)')# 异步获取结果,并获取 future 对象self._get_result_future = goal_handle.get_result_async()# 为 future 对象添加完成回调self._get_result_future.add_done_callback(self.get_result_callback)# 获取结果回调函数def get_result_callback(self, future):# 获取结果result = future.result().result# 记录结果信息self.get_logger().info('结果: {0}'.format(result.sequence))# 关闭 ROSrclpy.shutdown()# 主函数
def main(args=None):# 初始化 ROSrclpy.init(args=args)# 创建 Fibonacci 动作客户端对象action_client = FibonacciActionClient()# 发送目标,目标值为 10action_client.send_goal(10)# 保持 ROS 运行,处理回调rclpy.spin(action_client)# 如果当前脚本为主程序,则运行主函数
if __name__ == '__main__':main()

ActionClient.send_goal_async() `方法返回一个对目标句柄的未来。首先我们注册一个回调,以便在未来完成时使用:

self._send_goal_future.add_done_callback(self.goal_response_callback)

请注意,当动作服务器接受或拒绝目标请求时,未来就完成了。让我们更详细地看看 goal_response_callback 。我们可以检查目标是否被拒绝,并提前返回,因为我们知道不会有结果:

def goal_response_callback(self, future):goal_handle = future.result()if not goal_handle.accepted:self.get_logger().info('Goal rejected :(')returnself.get_logger().info('Goal accepted :)')

现在我们有了一个目标句柄,我们可以使用它通过方法 get_result_async() 来请求结果。与发送目标类似,我们将获得一个在结果准备好时完成的未来。让我们注册一个回调,就像我们为目标响应所做的那样:

self._get_result_future = goal_handle.get_result_async()self._get_result_future.add_done_callback(self.get_result_callback)

在回调中,我们记录结果序列并关闭 ROS 2 以进行干净的退出:

def get_result_callback(self, future):result = future.result().resultself.get_logger().info('Result: {0}'.format(result.sequence))rclpy.shutdown()

在一个单独的终端中运行动作服务器后,继续尝试运行我们的斐波那契动作客户端!

python3 fibonacci_action_client.py

您应该会看到目标被接受和最终结果的记录消息。

 2.2 获取反馈

我们的动作客户端可以发送目标。太好了!但如果我们能从动作服务器那里得到一些我们发送的目标的反馈就更好了。

这是此示例的完整代码:

# 导入 ROS 客户端库
import rclpy
# 导入 ROS 动作客户端
from rclpy.action import ActionClient
# 导入 ROS 节点
from rclpy.node import Node# 导入自定义的 Fibonacci 动作
from custom_action_interfaces.action import Fibonacci# 定义 Fibonacci 动作客户端类,继承自 Node
class FibonacciActionClient(Node):# 初始化函数def __init__(self):# 调用父类初始化函数,设置节点名为 'fibonacci_action_client'super().__init__('fibonacci_action_client')# 创建动作客户端,动作类型为 Fibonacci,动作名为 'fibonacci'self._action_client = ActionClient(self, Fibonacci, 'fibonacci')# 发送目标函数def send_goal(self, order):# 创建目标消息goal_msg = Fibonacci.Goal()# 设置目标消息的 order 字段goal_msg.order = order# 等待动作服务器self._action_client.wait_for_server()# 异步发送目标,并获取 future 对象,同时设置反馈回调函数self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)# 为 future 对象添加完成回调self._send_goal_future.add_done_callback(self.goal_response_callback)# 目标响应回调函数def goal_response_callback(self, future):# 获取目标句柄goal_handle = future.result()# 如果目标未被接受if not goal_handle.accepted:# 记录信息:目标被拒绝self.get_logger().info('目标被拒绝 :(')return# 记录信息:目标被接受self.get_logger().info('目标被接受 :)')# 异步获取结果,并获取 future 对象self._get_result_future = goal_handle.get_result_async()# 为 future 对象添加完成回调self._get_result_future.add_done_callback(self.get_result_callback)# 获取结果回调函数def get_result_callback(self, future):# 获取结果result = future.result().result# 记录结果信息self.get_logger().info('结果: {0}'.format(result.sequence))# 关闭 ROSrclpy.shutdown()# 反馈回调函数def feedback_callback(self, feedback_msg):# 获取反馈feedback = feedback_msg.feedback# 记录反馈信息self.get_logger().info('收到反馈: {0}'.format(feedback.partial_sequence))# 主函数
def main(args=None):# 初始化 ROSrclpy.init(args=args)# 创建 Fibonacci 动作客户端对象action_client = FibonacciActionClient()# 发送目标,目标值为 10action_client.send_goal(10)# 保持 ROS 运行,处理回调rclpy.spin(action_client)# 如果当前脚本为主程序,则运行主函数
if __name__ == '__main__':main()

这是反馈消息的回调函数:

def feedback_callback(self, feedback_msg):feedback = feedback_msg.feedbackself.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

在回调中,我们获取消息的反馈部分并将 partial_sequence 字段打印到屏幕上。

我们需要在动作客户端注册回调。这是通过在发送目标时额外传递回调给动作客户端来实现的:

self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

我们已经准备好了。如果我们运行我们的动作客户端,你应该会看到反馈被打印到屏幕上。

 摘要

在本教程中,您将逐行组合一个 Python 动作服务器和动作客户端,并配置它们以交换目标、反馈和结果。

 相关内容 

  • 在 Python 中编写动作服务器和客户端有几种方法;请查看 ros2/examples 仓库 https://github.com/ros2/examples/tree/jazzy/rclpy/actions 中的 minimal_action_server 和 minimal_action_client 包。

  • 有关 ROS 操作的更详细信息,请参阅设计文章 https://design.ros2.org/articles/actions.html 。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 从零手写实现 nginx-26-rewrite url 重写
  • javafx基础知识
  • 【RAG KG】GraphRAG开源:查询聚焦摘要的图RAG方法
  • 打卡第7天-----哈希表
  • Istio实战教程:Service Mesh部署与流量管理
  • AutoMQ 与蚂蚁数科达成战略合作
  • 机器学习——随机森林
  • WEB安全-文件上传漏洞
  • android 使用系统工具bootchart统计开机时长
  • 2-29 基于matlab的CEEMD
  • 【开发环境】搭建PX4+ROS2+MAVROS2+Simulink+Optitrack实物联合仿真环境
  • C# 中使用模式匹配 备忘
  • RT2-使用NLP的方式去训练机器人控制器
  • 半监督方案跟域自适应方案哪个更能提升目标检测泛化效果?
  • [高频 SQL 50 题(基础版)]第一千七百五十七题,可回收且低脂产品
  • [ JavaScript ] 数据结构与算法 —— 链表
  • [iOS]Core Data浅析一 -- 启用Core Data
  • 【跃迁之路】【477天】刻意练习系列236(2018.05.28)
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • Android优雅地处理按钮重复点击
  • ComponentOne 2017 V2版本正式发布
  • Electron入门介绍
  • github从入门到放弃(1)
  • HTTP那些事
  • interface和setter,getter
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • node-glob通配符
  • PV统计优化设计
  • 翻译--Thinking in React
  • 构建二叉树进行数值数组的去重及优化
  • 和 || 运算
  • 力扣(LeetCode)56
  • 前端性能优化——回流与重绘
  • 通过npm或yarn自动生成vue组件
  • 一个完整Java Web项目背后的密码
  • 终端用户监控:真实用户监控还是模拟监控?
  • 移动端高清、多屏适配方案
  • ​​​​​​​​​​​​​​Γ函数
  • ​字​节​一​面​
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • #我与Java虚拟机的故事#连载01:人在JVM,身不由己
  • (11)MSP430F5529 定时器B
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (阿里云在线播放)基于SpringBoot+Vue前后端分离的在线教育平台项目
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (二)测试工具
  • (理论篇)httpmoudle和httphandler一览
  • (三十)Flask之wtforms库【剖析源码上篇】
  • (算法)Travel Information Center
  • (一)插入排序
  • .NET 反射 Reflect
  • .NET 将多个程序集合并成单一程序集的 4+3 种方法
  • .NET 设计模式—简单工厂(Simple Factory Pattern)
  • .net 逐行读取大文本文件_如何使用 Java 灵活读取 Excel 内容 ?
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)