一、Actionlib的简介
本期文章同大家分享下ros 中actionlib机制的使用。简单说,actionlib机制是功能更强大的service/client机制。更多信息,欢迎关注EAI微信公众号。
在某些情况下,服务需要很长时间才能执行完毕,这时用户往往希望获得执行过程中的状态反馈或中途取消任务。比如在机器人移动过程中,希望获得机器人当前的姿态、移动的距离等信息或立刻取消当前动作并执行下一动作,这时使用actionlib机制就变的非常简单。
二、Actionlib的使用
1、以EAI科技的D1底盘为例(系统环境:Ubuntu-14.04,ROS-indigo),介绍actionlib的使用方法及遇到的问题,提醒大家避免我们踩过的坑。
2、适合用户:了解ROS及linux基础操作。
3、目标:用actionlib 机制改写D1使用手册中前进一米、原地旋转360度的python脚本,使其在执行过程中实时反馈已前进距离或已旋转角度;同时,每接收到新任务,立刻停止当前任务并执行新任务。
4、总体思路:
1)在D1中添加action依赖;
2)编写service和client;
3)运行并查看反馈过程。
三、Actionlib的使用方法
1、在D1中添加action依赖
1)远程登录到D1(ssh eaibot@192.168.31.200,密码eaibot)
2)路径~/dashgo_ws/src/dashgo/dashgo_tools下, 创建“action”文件夹并在其中创建check_msg.action文本,包含如下内容(goal、result、feedback间用“---”分割,且类型是float32而不是float):
#goal definition
float32 goal # 前进距离或旋转角度
float32 vel # 线速度或角速度
float32 error # 容忍的误差
---
#result definition
bool issuccess # 是否成功
---
#feedback
float32 accomplished # 已前进距离或已旋转角度
3)在dashgo_tools包的(路径:~/dashgo_ws/src/dashgo/dashgo_tools)CMakeLists.txt和package.xml中添加依赖:
a)在CMakeLists.txt中添加(请先找到对应函数并修改,而不要简单复制):
find_package(catkin REQUIRED COMPONENTS
actionlib
actionlib_msgs
)
add_action_files(
DIRECTORY action
FILES check_msg.action
)
generate_messages(DEPENDENCIES actionlib_msgs std_msgs)
catkin_package(CATKIN_DEPENDS actionlib_msgs)
b)在package.xml中添加:
4)增编译 dashgo_ws(路径:~/dashgo_ws/):删除build和devel,然后catkin_make。
5)编写service和client:脚本请参考文末。
请将脚本(前进一米、旋转360度各有一对service和client,共四份)放入D1 路径:~/dashgo_ws/src/dashgo/dashgo_tools/scripts下。
6) 运行并查看反馈过程:
a)前进一米
rosrun dashgo_tools check_linear_action_server.py
rosrun dashgo_tools check_linear_action_client.py
查看反馈rostopic echo /dashgo_tools/feedback
b)旋转360度
rosrun dashgo_tools check_linear_action_server.py
rosrun dashgo_tools check_linear_action_client.py
查看反馈rostopic echo /dashgo_tools/feedback
脚本同时实现了D1接收到新命令后,停止执行当前命令,并执行新命令。
四、脚本
4.1.1、创建check_angular_action_client.py
#! /usr/bin/env python
from __future__ import print_function
import rospy
import actionlib
import dashgo_tools.msg
def linear_start():
print("loginfo: angular_start()")
# Creates the SimpleActionClient, passing the type of the action
client = actionlib.SimpleActionClient('angular_server_py', dashgo_tools.msg.check_msgAction)
print("loginfo: client created")
# Waits until the action server has started up and started
# listening for goals.
client.wait_for_server()
print("loginfo: server connected")
# Creates a goal to send to the action server.
_goal = dashgo_tools.msg.check_msgGoal(goal = 360, vel = 0.5, error = 1) # degrees, radians per second, degrees converted to radians
print("loginfo: goal created")
# Sends the goal to the action server.
client.send_goal(_goal)
print("loginfo: goal sent")
if __name__ == '__main__':
try:
print("angular_client_py begin")
# Initializes a rospy node so that the SimpleActionClient can
# publish and subscribe over ROS.
rospy.init_node('angular_client_py')
linear_start()
print("angular_client_py finished")
except rospy.ROSInterruptException:
print("angular_client_py faild")
4.1.2、创建check_angular_action_server.py
#!/usr/bin/env python
import rospy
from geometry_msgs.msg import Twist, Quaternion
from nav_msgs.msg import Odometry
import tf
from math import radians, copysign
import PyKDL
from math import pi
import actionlib
import dashgo_tools.msg
def quat_to_angle(quat):
rot = PyKDL.Rotation.Quaternion(quat.x, quat.y, quat.z, quat.w)
return rot.GetRPY()[2]
def normalize_angle(angle):
res = angle
while res > pi:
res -= 2.0 * pi
while res < -pi:
res += 2.0 * pi
return res
class CalibrateAngular():
# create messages that are used to publish feedback/result
_goal = dashgo_tools.msg.check_msgGoal()
_feedback = dashgo_tools.msg.check_msgFeedback()
_result = dashgo_tools.msg.check_msgResult()
def __init__(self, name):
print("loginfo: __init__()")
self._action_name = name
self._as = actionlib.SimpleActionServer(self._action_name, dashgo_tools.msg.check_msgAction, self.execute_cb, False)
self._as.start()
# Set rospy to execute a shutdown function when terminating the script
rospy.on_shutdown(self.shutdown)
def execute_cb(self, goal):
print("loginfo: execute_cb() called")
# How fast will we check the odometry values?
self.rate = rospy.get_param('~rate', 20)
r = rospy.Rate(self.rate)
self._goal.goal = goal.goal
self._goal.vel = goal.vel
self._goal.error = goal.error
# The test angle is 360 degrees
self.test_angle = radians(rospy.get_param('~test_angle', self._goal.goal))
self.speed = rospy.get_param('~speed', self._goal.vel) # radians per second
self.tolerance = radians(rospy.get_param('tolerance', self._goal.error)) # degrees converted to radians
self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)
self.start_test = rospy.get_param('~start_test', True)
# Publisher to control the robot's speed
self.cmd_vel = rospy.Publisher('/cmd_vel', Twist, queue_size=5)
# The base frame is usually base_link or base_footprint
self.base_frame = rospy.get_param('~base_frame', '/base_footprint')
# The odom frame is usually just /odom
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
# Initialize the tf listener
self.tf_listener = tf.TransformListener()
# Give tf some time to fill its buffer
rospy.sleep(2)
# Make sure we see the odom and base frames
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60.0))
rospy.loginfo("Bring up rqt_reconfigure to control the test.")
reverse = 1
while not rospy.is_shutdown():
if self.start_test:
# Get the current rotation angle from tf
self.odom_angle = self.get_odom_angle()
last_angle = self.odom_angle
turn_angle = 0
self.test_angle *= reverse
error = self.test_angle - turn_angle
# Alternate directions between tests
reverse = -reverse
while abs(error) > self.tolerance and self.start_test:
if rospy.is_shutdown():
return
if self._as.is_preempt_requested():
rospy.loginfo('check_angular: Preempted')
self._as.set_preempted()
break
# Rotate the robot to reduce the error
move_cmd = Twist()
move_cmd.angular.z = copysign(self.speed, error)
self.cmd_vel.publish(move_cmd)
r.sleep()
# Get the current rotation angle from tf
self.odom_angle = self.get_odom_angle()
# Compute how far we have gone since the last measurement
delta_angle = self.odom_angular_scale_correction * normalize_angle(self.odom_angle - last_angle)
# Add to our total angle so far
turn_angle += delta_angle
self._feedback.accomplished = turn_angle * 57.2957795 # 180/pi
self._as.publish_feedback(self._feedback)
# Compute the new error
error = self.test_angle - turn_angle
# Store the current angle for the next comparison
last_angle = self.odom_angle
# Stop the robot
self.cmd_vel.publish(Twist())
# Update the status flag
self.start_test = False
params = {'start_test': False}
self._result.issuccess = True
rospy.loginfo('%s: Succeeded' % self._action_name)
self._as.set_succeeded(self._result)
break
rospy.sleep(0.5)
# Stop the robot
self.cmd_vel.publish(Twist())
def get_odom_angle(self):
# Get the current transform between the odom and base frames
try:
(trans, rot) = self.tf_listener.lookupTransform(self.odom_frame, self.base_frame, rospy.Time(0))
except (tf.Exception, tf.ConnectivityException, tf.LookupException):
rospy.loginfo("TF Exception")
return
# Convert the rotation from a quaternion to an Euler angle
return quat_to_angle(Quaternion(*rot))
def shutdown(self):
# Always stop the robot when shutting down the node
rospy.loginfo("Stopping the robot...")
self.cmd_vel.publish(Twist())
rospy.sleep(1)
if __name__ == '__main__':
print("")
print("angular_server_py begin")
rospy.init_node('angular_server_py')
server = CalibrateAngular(rospy.get_name())
rospy.spin()
4.2.1、创建check_linear_action_client.py
#! /usr/bin/env python
from __future__ import print_function
import rospy
import actionlib
import dashgo_tools.msg
def linear_start():
print("loginfo: linear_start()")
# Creates the SimpleActionClient, passing the type of the action
client = actionlib.SimpleActionClient('linear_server_py', dashgo_tools.msg.check_msgAction)
print("loginfo: client created")
# Waits until the action server has started up and started
# listening for goals.
client.wait_for_server()
print("loginfo: server connected")
# Creates a goal to send to the action server.
goal = dashgo_tools.msg.check_msgGoal(goal = 1.0, vel = 0.15, error = 0.01) # distance, velocity, error
print("loginfo: goal created")
# Sends the goal to the action server.
client.send_goal(goal)
print("loginfo: goal sent")
if __name__ == '__main__':
try:
print("linear_client_py begin")
# Initializes a rospy node so that the SimpleActionClient can
# publish and subscribe over ROS.
rospy.init_node('linear_client_py')
linear_start()
print("linear_client_py finished")
except rospy.ROSInterruptException:
print("linear_client_py faild")
2.2.2、创建check_linear_action_server.py
#! /usr/bin/env python
import rospy
from geometry_msgs.msg import Twist, Point
from math import copysign, sqrt, pow
import tf
import actionlib
import dashgo_tools.msg
class CalibrateLinear(object):
# create messages that are used to publish feedback/result
_goal = dashgo_tools.msg.check_msgGoal()
_feedback = dashgo_tools.msg.check_msgFeedback()
_result = dashgo_tools.msg.check_msgResult()
def __init__(self, name):
print("loginfo: __init__()")
self._action_name = name
self._as = actionlib.SimpleActionServer(self._action_name, dashgo_tools.msg.check_msgAction, self.execute_cb, False)
self._as.start()
# Set rospy to execute a shutdown function when terminating the script
rospy.on_shutdown(self.shutdown)
def execute_cb(self, goal):
print("loginfo: execute_cb() called")
# How fast will we check the odometry values?
self.rate = rospy.get_param('~rate', 20)
r = rospy.Rate(self.rate)
self._goal.goal = goal.goal
self._goal.vel = goal.vel
self._goal.error = goal.error
# Set the distance to travel
self.test_distance = rospy.get_param('~test_distance', self._goal.goal) # meters
self.speed = rospy.get_param('~speed', self._goal.vel) # meters per second
self.tolerance = rospy.get_param('~tolerance', self._goal.error) # meters
self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)
self.start_test = rospy.get_param('~start_test', True)
# Publisher to control the robot's speed
self.cmd_vel = rospy.Publisher('/cmd_vel', Twist, queue_size=5)
# The base frame is base_footprint for the TurtleBot but base_link for Pi Robot
self.base_frame = rospy.get_param('~base_frame', '/base_footprint')
# The odom frame is usually just /odom
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
# Initialize the tf listener
self.tf_listener = tf.TransformListener()
# Give tf some time to fill its buffer
rospy.sleep(2)
# Make sure we see the odom and base frames
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60.0))
rospy.loginfo("Bring up rqt_reconfigure to control the test.")
self.position = Point()
# Get the starting position from the tf transform between the odom and base frames
self.position = self.get_position()
x_start = self.position.x
y_start = self.position.y
move_cmd = Twist()
while not rospy.is_shutdown():
# Stop the robot by default
move_cmd = Twist()
if self._as.is_preempt_requested():
rospy.loginfo('check_linear: Preempted')
self._as.set_preempted()
break
if self.start_test:
# Get the current position from the tf transform between the odom and base frames
self.position = self.get_position()
# Compute the Euclidean distance from the target point
distance = sqrt(pow((self.position.x - x_start), 2) +
pow((self.position.y - y_start), 2))
# Correct the estimated distance by the correction factor
distance *= self.odom_linear_scale_correction
# How close are we?
error = distance - self.test_distance
# Are we close enough?
if not self.start_test or abs(error) < self.tolerance:
self.start_test = False
params = {'start_test': False}
# rospy.loginfo(params)
self._result.issuccess = True
rospy.loginfo('%s: Succeeded' % self._action_name)
self._as.set_succeeded(self._result)
break
else:
# If not, move in the appropriate direction
move_cmd.linear.x = copysign(self.speed, -1 * error)
self._feedback.accomplished = distance
self._as.publish_feedback(self._feedback)
else:
self.position = self.get_position()
x_start = self.position.x
y_start = self.position.y
self.cmd_vel.publish(move_cmd)
r.sleep()
# Stop the robot
self.cmd_vel.publish(Twist())
def get_position(self):
# Get the current transform between the odom and base frames
try:
(trans, rot) = self.tf_listener.lookupTransform(self.odom_frame, self.base_frame, rospy.Time(0))
except (tf.Exception, tf.ConnectivityException, tf.LookupException):
rospy.loginfo("TF Exception")
return
return Point(*trans)
def shutdown(self):
# Always stop the robot when shutting down the node
rospy.loginfo("Stopping the robot...")
self.cmd_vel.publish(Twist())
rospy.sleep(1)
if __name__ == '__main__':
print("")
print("linear_server_py begin")
rospy.init_node('linear_server_py')
server = CalibrateLinear(rospy.get_name())
rospy.spin()
五、ROS官网
Actionlib 机制的详细介绍请参见官网: